зеркало из https://github.com/github/codeql.git
504 строки
20 KiB
Python
Executable File
504 строки
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
r"""
|
|
This script can be used to go over `codeql test run` expected/actual log output from
|
|
github actions, and apply patches locally to make the tests pass.
|
|
|
|
It works by forming individual patches for each test-failure, and then applying them one
|
|
by one. Since Python runs some tests twice (for both Python2 and Python3), we try to do
|
|
some simple duplicate patch detection, to avoid failing to apply the same patch twice.
|
|
|
|
Use it by checking out an up to date version of your branch, and just run the script (it
|
|
will use `gh` cli to figure out what the PR number is). You must have the `gh` cli tool
|
|
installed.
|
|
|
|
The parsing of logs is somewhat tolerant of output being interleaved with other parallel
|
|
execution, but it might fail in some cases ¯\_(ツ)_/¯
|
|
|
|
Code written to hack things together until they work, so don't expect much :D
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import re
|
|
import tempfile
|
|
from typing import List, Dict, Set, Optional, Any
|
|
from pathlib import Path
|
|
import subprocess
|
|
import logging
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import os.path
|
|
import itertools
|
|
|
|
LOGGER = logging.getLogger(Path(__file__).name)
|
|
|
|
DEBUG_SAVE_PATCHES = False
|
|
DEBUG_LOG_FILE = None
|
|
|
|
|
|
def _get_codeql_repo_dir() -> Path:
|
|
return Path(__file__).parent.parent.parent.resolve()
|
|
|
|
|
|
CODEQL_REPO_DIR = _get_codeql_repo_dir()
|
|
|
|
def _get_semmle_code_dir() -> Optional[Path]:
|
|
guess = CODEQL_REPO_DIR.parent.resolve()
|
|
try:
|
|
out = subprocess.check_output(
|
|
["git", "remote", "-v"],
|
|
cwd=guess,
|
|
).decode("utf-8")
|
|
if "github/semmle-code" in out:
|
|
return guess
|
|
else:
|
|
return None
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
|
|
SEMMLE_CODE_DIR = _get_semmle_code_dir()
|
|
if SEMMLE_CODE_DIR is None:
|
|
LOGGER.warning("Could not find semmle-code repo, will not be able to apply patches for it")
|
|
|
|
|
|
@dataclass(frozen=True, eq=True, order=True)
|
|
class Patch:
|
|
filename: Path
|
|
dir: Optional[str]
|
|
patch_first_line: str
|
|
patch: List[str] = field(hash=False)
|
|
|
|
def format_as_patch(self) -> str:
|
|
return (f"--- a/{self.filename}\n" +
|
|
f"+++ b/{self.filename}\n" +
|
|
"\n".join(self.patch) +
|
|
"\n"
|
|
)
|
|
|
|
|
|
def parse_log_line(log_line: str) -> str:
|
|
if '\t' in log_line:
|
|
m = re.fullmatch(r"^(?:[^\t]+\t){2}\S+ ?(.*)$", log_line.strip())
|
|
else:
|
|
m = re.fullmatch(r"^(?:[^ ]+ )(.*)$", log_line.strip())
|
|
|
|
if not m:
|
|
return ""
|
|
|
|
return m.group(1)
|
|
|
|
|
|
def make_patches_from_log_file(log_file_lines) -> List[Patch]:
|
|
# TODO: If the expected output contains `Password=123` this will be masked as `***`
|
|
# in the actions logs. We _could_ try to detect this and replace the line with the
|
|
# actual content from file (but it would require parsing the @@ lines, to understand
|
|
# what the old linenumber to read is)
|
|
patches = []
|
|
|
|
lines = iter(log_file_lines)
|
|
|
|
for raw_line in lines:
|
|
line = parse_log_line(raw_line)
|
|
|
|
if line == "--- expected":
|
|
while True:
|
|
next_line = parse_log_line(next(lines))
|
|
if next_line == "+++ actual":
|
|
break
|
|
|
|
lines_changed = []
|
|
|
|
while True:
|
|
next_line = parse_log_line(next(lines))
|
|
# it can be the case that
|
|
if next_line and next_line[0] in (" ", "-", "+", "@"):
|
|
lines_changed.append(next_line)
|
|
if "FAILED" in next_line:
|
|
break
|
|
|
|
# error line _should_ be next, but sometimes the output gets interleaved...
|
|
# so we just skip until we find the error line
|
|
error_line = next_line
|
|
while True:
|
|
# internal
|
|
filename_match = re.fullmatch(r"^##\[error\].*FAILED\(RESULT\) (.*)$", error_line)
|
|
if not filename_match:
|
|
# codeql action
|
|
filename_match = re.fullmatch(r"^.*FAILED\(RESULT\) (.*)$", error_line)
|
|
if filename_match:
|
|
break
|
|
error_line = parse_log_line(next(lines))
|
|
|
|
full_path = filename_match.group(1)
|
|
|
|
known_start_paths = {
|
|
# internal CI runs
|
|
"/home/runner/work/semmle-code/semmle-code/ql/": CODEQL_REPO_DIR,
|
|
"/home/runner/work/semmle-code/semmle-code/target/codeql-java-integration-tests/ql/": CODEQL_REPO_DIR,
|
|
"/home/runner/work/semmle-code/semmle-code/" : SEMMLE_CODE_DIR,
|
|
# github actions on codeql repo
|
|
"/home/runner/work/codeql/codeql/": CODEQL_REPO_DIR,
|
|
"/Users/runner/work/codeql/codeql/" : CODEQL_REPO_DIR, # MacOS
|
|
}
|
|
for known_start_path, dir in known_start_paths.items():
|
|
if full_path.startswith(known_start_path):
|
|
filename = Path(full_path[len(known_start_path):])
|
|
break
|
|
else:
|
|
raise Exception(f"Unknown path {full_path}, skipping")
|
|
|
|
expected_filename = filename.with_suffix(".expected")
|
|
|
|
# if the .expected file used to be empty, `codeql test run` outputs a diff
|
|
# that uses `@@ -1,1 ` but git wants it to be `@@ -0,0 ` (and not include
|
|
# the removal of the empty line)
|
|
if lines_changed[0].startswith("@@ -1,1 ") and lines_changed[1] == "-":
|
|
lines_changed[0] = lines_changed[0].replace("@@ -1,1 ", "@@ -0,0 ")
|
|
del lines_changed[1]
|
|
|
|
patch = Patch(
|
|
filename=expected_filename,
|
|
dir=dir,
|
|
patch_first_line=lines_changed[0],
|
|
patch=lines_changed,
|
|
)
|
|
patches.append(patch)
|
|
|
|
return patches
|
|
|
|
def make_github_api_list_request(path: str) -> Any:
|
|
"""Handles the way paginate currently works, which is to return multiple lists of
|
|
results, instead of merging them :(
|
|
"""
|
|
|
|
s = subprocess.check_output(
|
|
["gh", "api", "--paginate", path],
|
|
).decode("utf-8")
|
|
|
|
try:
|
|
return json.loads(s)
|
|
except json.JSONDecodeError:
|
|
assert "][" in s
|
|
parts = s.split("][")
|
|
parts[0] = parts[0] + "]"
|
|
parts[-1] = "[" + parts[-1]
|
|
for i in range(1, len(parts) - 1):
|
|
parts[i] = "[" + parts[i] + "]"
|
|
return itertools.chain(*[json.loads(x) for x in parts])
|
|
|
|
|
|
@dataclass
|
|
class GithubStatus():
|
|
state: str
|
|
context: str
|
|
target_url: str
|
|
created_at: datetime
|
|
nwo: str
|
|
job_ids: set = None
|
|
|
|
|
|
def get_log_content(status: GithubStatus) -> str:
|
|
LOGGER.debug(f"'{status.context}': Getting logs")
|
|
if status.job_ids:
|
|
contents = [subprocess.check_output(
|
|
["gh", "api", f"/repos/{status.nwo}/actions/jobs/{job_id}/logs"],
|
|
).decode("utf-8") for job_id in status.job_ids]
|
|
content = "\n".join(contents)
|
|
else:
|
|
m = re.fullmatch(r"^https://github\.com/([^/]+/[^/]+)/actions/runs/(\d+)(?:/jobs/(\d+))?$", status.target_url)
|
|
nwo = m.group(1)
|
|
run_id = m.group(2)
|
|
content = subprocess.check_output(
|
|
["gh", "run", "view", "--repo", nwo, run_id, "--log-failed"],
|
|
).decode("utf-8")
|
|
|
|
if DEBUG_SAVE_PATCHES:
|
|
tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
tmp.write(content.encode(encoding="utf-8"))
|
|
print(tmp.name)
|
|
return content
|
|
|
|
|
|
def main(pr_number: Optional[int], sha_override: Optional[str] = None, force=False, wait_for_ci=True):
|
|
if not pr_number and not sha_override:
|
|
raise Exception("Must specify either a PR number or a SHA")
|
|
|
|
if sha_override:
|
|
github_sha = sha_override
|
|
else:
|
|
LOGGER.info(f"Getting status URL for codeql PR #{pr_number}")
|
|
github_sha = subprocess.check_output(
|
|
["gh", "api", f"repos/github/codeql/pulls/{pr_number}", "--jq", ".head.sha"]
|
|
).decode("utf-8").strip()
|
|
|
|
local_sha = subprocess.check_output(
|
|
["git", "rev-parse", "HEAD"]
|
|
).decode("utf-8").strip()
|
|
|
|
if local_sha != github_sha and not force:
|
|
LOGGER.error(f"GitHub SHA ({github_sha}) different from your local SHA ({local_sha}), sync your changes first!")
|
|
sys.exit(1)
|
|
sha = github_sha
|
|
|
|
status_url = f"https://api.github.com/repos/github/codeql/commits/{sha}/statuses"
|
|
LOGGER.info(f"Getting status details ({status_url})")
|
|
statuses = make_github_api_list_request(status_url)
|
|
newest_status: Dict[str, GithubStatus] = dict()
|
|
|
|
for status in statuses:
|
|
created_at = datetime.fromisoformat(status["created_at"][:-1])
|
|
key = status["context"]
|
|
if key not in newest_status or created_at > newest_status[key].created_at:
|
|
m = re.fullmatch(r"^https://github\.com/([^/]+/[^/]+)/actions/runs/(\d+)(?:/jobs/(\d+))?$", status["target_url"])
|
|
|
|
newest_status[key] = GithubStatus(
|
|
state=status["state"],
|
|
context=status["context"],
|
|
target_url=status["target_url"],
|
|
created_at=created_at,
|
|
nwo=m.group(1),
|
|
)
|
|
|
|
supported_internal_status_language_test_names = [
|
|
"Java Integration Tests Linux",
|
|
# "codeql-coding-standards Unit Tests Linux",
|
|
# TODO: Currently disabled, would just require support for figuring out where
|
|
# https://github.com/github/codeql-coding-standards/ is checked out locally
|
|
]
|
|
|
|
lang_test_failures: List[GithubStatus] = list()
|
|
for status in newest_status.values():
|
|
if " Language Tests" in status.context or status.context in supported_internal_status_language_test_names:
|
|
if status.state == "failure":
|
|
lang_test_failures.append(status)
|
|
elif status.state == "pending":
|
|
if wait_for_ci:
|
|
LOGGER.error(f"Language tests ({status.context}) are still running, please wait for them to finish before running this script again (or run with --dont-wait)")
|
|
sys.exit(1)
|
|
|
|
job_failure_urls = set()
|
|
for lang_test_failure in lang_test_failures:
|
|
job_failure_urls.add(lang_test_failure.target_url)
|
|
|
|
for job_failure_url in job_failure_urls:
|
|
# fixup URL. On the status, the target URL is the run, and it's really hard to
|
|
# change this to link to the full `/runs/<run_id>/jobs/<numeric_job_id>` URL, since
|
|
# the `<numeric_job_id>` is not available in a context: https://github.com/community/community/discussions/40291
|
|
m = re.fullmatch(r"^https://github\.com/([^/]+/[^/]+)/actions/runs/(\d+)$", job_failure_url)
|
|
nwo = m.group(1)
|
|
run_id = m.group(2)
|
|
jobs_url = f"https://api.github.com/repos/{nwo}/actions/runs/{run_id}/jobs"
|
|
LOGGER.info(f"Fixing up target url from looking at {jobs_url}")
|
|
jobs = json.loads(subprocess.check_output(["gh", "api", "--paginate", jobs_url]).decode("utf-8"))
|
|
for lang_test_failure in lang_test_failures:
|
|
workflow_translation = {
|
|
"codeql-coding-standards Unit Tests Linux": "Start codeql-coding-standards"
|
|
}
|
|
expected_workflow_name = workflow_translation.get(lang_test_failure.context, lang_test_failure.context)
|
|
|
|
for job in jobs["jobs"]:
|
|
api_name: str = job["name"]
|
|
|
|
if api_name.lower().startswith(expected_workflow_name.lower()):
|
|
if lang_test_failure.job_ids is None:
|
|
lang_test_failure.job_ids = set()
|
|
lang_test_failure.job_ids.add(job["id"])
|
|
continue
|
|
|
|
if " / " not in api_name:
|
|
continue
|
|
|
|
workflow_name, job_name = api_name.split(" / ")
|
|
# The job names we're looking for looks like "Python2 Language Tests / Python2 Language Tests" or "Java Language Tests / Java Language Tests Linux"
|
|
# for "Java Integration Tests Linux / Java Integration tests Linux" we need to ignore case :|
|
|
if workflow_name == expected_workflow_name and job_name.lower().startswith(lang_test_failure.context.lower()):
|
|
lang_test_failure.job_ids.add(job["id"])
|
|
continue
|
|
|
|
for lang_test_failure in lang_test_failures:
|
|
if lang_test_failure.job_ids is None:
|
|
LOGGER.error(f"Could not find job for {lang_test_failure.context!r}")
|
|
sys.exit(1)
|
|
|
|
# Ruby/Swift/C#/Go use github actions, and not internal CI. These are not reported
|
|
# from the /statuses API, but from the /check-suites API
|
|
check_suites_url = f"https://api.github.com/repos/github/codeql/commits/{sha}/check-suites"
|
|
LOGGER.info(f"Getting check suites ({check_suites_url})")
|
|
check_suites = json.loads(subprocess.check_output(["gh", "api", "--paginate", check_suites_url]).decode("utf-8"))
|
|
|
|
check_failure_urls = []
|
|
for check in check_suites["check_suites"]:
|
|
if check["status"] != "completed":
|
|
if wait_for_ci:
|
|
print(check)
|
|
LOGGER.error("At least one check not completed yet!")
|
|
sys.exit(1)
|
|
|
|
if check["conclusion"] == "failure":
|
|
check_failure_urls.append(check["check_runs_url"])
|
|
|
|
for check_failure_url in check_failure_urls:
|
|
# run information: https://docs.github.com/en/rest/actions/workflow-jobs?apiVersion=2022-11-28#get-a-job-for-a-workflow-run
|
|
check_runs = json.loads(subprocess.check_output(["gh", "api", "--paginate", check_failure_url]).decode("utf-8"))
|
|
for check_run in check_runs["check_runs"]:
|
|
if check_run["conclusion"] == "failure":
|
|
m = re.fullmatch(r"^https://github\.com/([^/]+/[^/]+)/actions/runs/(\d+)(?:/job/(\d+))?$", check_run["details_url"])
|
|
if not m:
|
|
LOGGER.error(f"Could not parse details URL for {check_run['name']}: '{check_run['details_url']}'")
|
|
continue
|
|
nwo = m.group(1)
|
|
run_id = m.group(2)
|
|
jobs_url = f"https://api.github.com/repos/{nwo}/actions/runs/{run_id}/jobs"
|
|
LOGGER.debug(f"Looking into failure at {jobs_url}")
|
|
jobs = json.loads(subprocess.check_output(["gh", "api", "--paginate", jobs_url]).decode("utf-8"))
|
|
|
|
for job in jobs["jobs"]:
|
|
OK_WORKFLOW_NAMES = ["C#: Run QL Tests", "Go: Run Tests", "Ruby: Run QL Tests", "Swift"]
|
|
def ok_job_name(job_name: str) -> bool:
|
|
if job_name.startswith("qltest"):
|
|
return True
|
|
if job_name.startswith("Test Linux"):
|
|
return True
|
|
if job_name.startswith("integration-tests"):
|
|
return True
|
|
return False
|
|
|
|
if job["name"] == check_run['name'] and job["workflow_name"] in OK_WORKFLOW_NAMES and ok_job_name(job["name"]):
|
|
# print(check_run['name'], 'matched to', f"{job['workflow_name']} / {job['name']}")
|
|
lang_test_failures.append(GithubStatus(
|
|
state="failure",
|
|
context=f"{job['workflow_name']} / {job['name']}",
|
|
target_url=job["html_url"],
|
|
created_at=check_run["completed_at"],
|
|
nwo=nwo,
|
|
job_ids={job["id"]},
|
|
))
|
|
break
|
|
else:
|
|
LOGGER.debug(f"Ignoring actions failure for '{check_run['name']}' ({check_run['details_url']})")
|
|
|
|
if not lang_test_failures:
|
|
LOGGER.info("No language test failures found")
|
|
return
|
|
|
|
patches_to_apply: Set[Patch] = set()
|
|
|
|
for failure in lang_test_failures:
|
|
log_content = get_log_content(failure)
|
|
LOGGER.info(f"'{failure.context}': Making patches")
|
|
patches = make_patches_from_log_file(log_content.splitlines())
|
|
|
|
if not patches:
|
|
LOGGER.warning(f"No patches generated for job {failure}")
|
|
continue
|
|
|
|
for patch in patches:
|
|
if patch in patches_to_apply:
|
|
LOGGER.debug(f"Skipping duplicate patch for {patch.filename}")
|
|
continue
|
|
patches_to_apply.add(patch)
|
|
|
|
if DEBUG_SAVE_PATCHES:
|
|
sys.exit("debug save patches")
|
|
|
|
if not patches_to_apply:
|
|
print("No patches to apply")
|
|
return
|
|
|
|
semmle_code_changed = False
|
|
|
|
for patch in patches_to_apply:
|
|
with tempfile.NamedTemporaryFile(prefix=f"patches-", suffix=".patch") as temp:
|
|
temp.write(patch.format_as_patch().encode(encoding="utf-8"))
|
|
temp.flush()
|
|
LOGGER.info(f"Applying patch for '{patch.filename}'")
|
|
try:
|
|
if patch.dir is None:
|
|
LOGGER.warning(f"Did not find local semmle-code directory, so skipping patch for '{patch.filename}'")
|
|
continue
|
|
|
|
subprocess.check_call(["git", "apply", temp.name], cwd=patch.dir)
|
|
|
|
if "CONSISTENCY" in patch.filename.parts and patch.filename.exists():
|
|
# delete if empty
|
|
if os.path.getsize(patch.filename) == 1 and patch.filename.read_text() == "\n":
|
|
os.remove(patch.filename)
|
|
LOGGER.info(f"Deleted empty CONSISTENCY file '{patch.filename}'")
|
|
|
|
if patch.dir == SEMMLE_CODE_DIR:
|
|
semmle_code_changed = True
|
|
except subprocess.CalledProcessError:
|
|
LOGGER.error(f"Could not apply patches for '{patch.filename}' '{patch.patch[0]}', skipping")
|
|
tmp_keep = tempfile.NamedTemporaryFile(delete=False)
|
|
tmp_keep.write(patch.format_as_patch().encode(encoding="utf-8"))
|
|
LOGGER.error(f"Patch saved to {tmp_keep.name}")
|
|
|
|
if semmle_code_changed:
|
|
print("Expected output in semmle-code changed!")
|
|
|
|
|
|
def printHelp():
|
|
print("""Usage:
|
|
python3 accept-expected-changes-from-ci.py [PR-number|SHA]
|
|
|
|
Example invocations:
|
|
$ python3 accept-expected-changes-from-ci.py 1234
|
|
$ python3 accept-expected-changes-from-ci.py d88a8130386b720de6cac747d1bd2dd527769467
|
|
|
|
Requirements:
|
|
- The 'gh' command line tool must be installed and authenticated.
|
|
- The CI check must have finished.
|
|
""")
|
|
|
|
if __name__ == "__main__":
|
|
|
|
level = logging.INFO
|
|
|
|
try:
|
|
import coloredlogs
|
|
coloredlogs.install(level, fmt="%(levelname)s: %(message)s")
|
|
except ImportError:
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
|
|
# parse command line arguments
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--force", action="store_true", help="Apply patches even if the local SHA is different from the GitHub PR SHA")
|
|
parser.add_argument("--dont-wait", dest="wait_for_ci", action="store_false", help="Do not wait for all CI jobs to finish")
|
|
parser.add_argument("posarg", nargs="?", default=None)
|
|
|
|
if DEBUG_LOG_FILE:
|
|
patches = make_patches_from_log_file(open(DEBUG_LOG_FILE, "r").readlines())
|
|
for patch in patches:
|
|
if True:
|
|
tmp_keep = tempfile.NamedTemporaryFile(delete=False)
|
|
tmp_keep.write(patch.format_as_patch().encode(encoding="utf-8"))
|
|
LOGGER.info(f"Patch for {patch.filename} saved to {tmp_keep.name}")
|
|
sys.exit(1)
|
|
|
|
os.chdir(CODEQL_REPO_DIR)
|
|
|
|
pr_number = None
|
|
override_sha = None
|
|
args = parser.parse_args()
|
|
|
|
if args.posarg is None:
|
|
try:
|
|
pr_number_response = subprocess.check_output([
|
|
"gh", "pr", "view", "--json", "number"
|
|
]).decode("utf-8")
|
|
pr_number = json.loads(pr_number_response)["number"]
|
|
except:
|
|
print("Could not auto detect PR number.")
|
|
print("")
|
|
printHelp()
|
|
sys.exit(1)
|
|
else:
|
|
if len(args.posarg) > 10:
|
|
override_sha = args.posarg
|
|
else:
|
|
pr_number = int(args.posarg)
|
|
|
|
main(pr_number, override_sha, force=args.force, wait_for_ci=args.wait_for_ci)
|