Add scripts and workflow for publishing to CVE Services (#122)

This commit is contained in:
Malte Jürgens 2023-09-11 13:39:21 +02:00 коммит произвёл GitHub
Родитель 63d0f74ad2
Коммит b6057f4cfa
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 866 добавлений и 148 удалений

34
.github/workflows/assign.yml поставляемый Normal file
Просмотреть файл

@ -0,0 +1,34 @@
name: Assign CVE IDs
on: workflow_dispatch
jobs:
assign:
name: Assign CVE IDs
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Clone repository
uses: actions/checkout@v3
with:
fetch-depth: 0 # Fetch all commits, needded for timestamps
- name: Set up git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- name: Set up Python 3
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install Python dependencies
run: pip install ./
- name: Run Advisories Checks
run: check_advisories --all
- name: Assign CVE IDs
run: assign_cve_ids
env:
CVE_API_KEY: ${{ secrets.CVE_API_KEY }}
CVE_ENV: ${{ vars.CVE_ENV }}
CVE_ORG: ${{ vars.CVE_ORG }}
CVE_USER: ${{ vars.CVE_USER }}

24
.github/workflows/ci.yml поставляемый
Просмотреть файл

@ -1,24 +0,0 @@
name: Check Advisories
on:
# Triggers the workflow on push events only for the default branch
pull_request:
push:
branches:
- master
jobs:
fetch:
name: Check Advisories
runs-on: ubuntu-latest
steps:
- name: Clone repository
uses: actions/checkout@v3
- name: Set up Python 3
uses: actions/setup-python@v3
with:
python-version: '3.9'
- name: Install Python dependencies
run: pip install ./
- name: Run Advisories Checks
run: check_advisories --all

54
.github/workflows/push.yml поставляемый Normal file
Просмотреть файл

@ -0,0 +1,54 @@
name: Push
on:
# Triggers the workflow on push events only for the default branch
pull_request:
push:
branches:
- master
jobs:
check:
name: Check Advisories
runs-on: ubuntu-latest
steps:
- name: Clone repository
uses: actions/checkout@v3
- name: Set up Python 3
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install Python dependencies
run: pip install ./
- name: Run Advisories Checks
run: check_advisories --all
publish:
name: Publish CVE Advisories
needs: check
if: false && github.ref == 'refs/heads/master' && github.repository == 'mozilla/foundation-security-advisories'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Clone repository
uses: actions/checkout@v3
with:
fetch-depth: 0 # Fetch all commits, needded for timestamps
- name: Set up git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- name: Set up Python 3
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install Python dependencies
run: pip install ./
- name: Publish Advisories
run: publish_cve_advisories
env:
CVE_API_KEY: ${{ secrets.CVE_API_KEY }}
CVE_ENV: ${{ vars.CVE_ENV }}
CVE_ORG: ${{ vars.CVE_ORG }}
CVE_USER: ${{ vars.CVE_USER }}

5
.gitignore поставляемый
Просмотреть файл

@ -1 +1,4 @@
debuglog.*
debuglog.*
*.egg-info
/build
__pycache__

Просмотреть файл

@ -0,0 +1,38 @@
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import sys
import subprocess
from foundation_security_advisories.common import (
CVEAdvisory,
)
from foundation_security_advisories.common_cve import *
def main():
local_cve_advisories: dict[str, CVEAdvisory] = get_local_cve_advisories()
for cve_id in local_cve_advisories:
cve_advisory = local_cve_advisories[cve_id]
if cve_id.startswith("MFSA-RESERVE"):
print(f"\n-> {cve_id}")
replace_cve_id(cve_advisory)
if os.getenv("CI"):
subprocess.run(
[
"git",
"commit",
"-m",
f"Assign CVE ids",
]
)
subprocess.run(["git", "push"])
if __name__ == "__main__":
sys.exit(main())

Просмотреть файл

@ -11,27 +11,22 @@ git pre-commit hook.
from __future__ import unicode_literals, print_function
import argparse
import codecs
import fnmatch
import os
import re
import sys
from datetime import date
from glob import glob
from subprocess import check_output
import yaml
from dateutil.parser import parse as parsedate
from markdown import markdown
from schema import Schema, Regex, Optional, Or, SchemaError
from foundation_security_advisories.common import (
HOF_FILENAME_RE,
parse_md_file,
parse_yml_file,
get_all_files,
get_modified_files
)
GIT = os.getenv('GIT_BIN', 'git')
ADVISORIES_DIR = 'announce'
HOF_DIR = 'bug-bounty-hof'
CVE_RE = re.compile('^(CVE|MFSA-TMP)-20[0-9]{2}-[0-9]{4,9}$')
MFSA_FILENAME_RE = re.compile('mfsa(\d{4}-\d{2,3})\.(md|yml)$')
HOF_FILENAME_RE = re.compile('bug-bounty-hof/\w+\.yml$')
CVE_RE = re.compile('^(CVE|MFSA-TMP|MFSA-RESERVE)-20[0-9]{2}-[0-9]{4,9}$')
md_schema = Schema({
'mfsa_id': str,
'fixed_in': [str],
@ -64,58 +59,6 @@ yaml_schema = Schema({
})
def mfsa_id_from_filename(filename):
match = MFSA_FILENAME_RE.search(filename)
if match:
return match.group(1)
return None
def git_diff(staged):
"""
Return the modified files in the repo.
:param staged: boolean return only those changes staged in git
:return: list modified file names.
"""
command = [GIT, 'diff', '--name-only']
if staged:
command.append('--cached')
git_out = check_output(command, universal_newlines=True).split()
return [fn for fn in git_out if
MFSA_FILENAME_RE.search(fn) or HOF_FILENAME_RE.search(fn)]
def get_modified_files(staged_only):
"""
Return the modified file names in the repo.
:param staged_only: boolean include all changes or only staged.
:return: list modified file names.
"""
staged_files = git_diff(staged=True)
if staged_only:
return staged_files
modified_files = set(staged_files)
modified_files.update(git_diff(staged=False))
return list(modified_files)
def get_all_files():
"""
Return all advisory file names in the repo.
:return: generator of file names.
"""
for root, dirnames, filenames in os.walk(ADVISORIES_DIR):
for filename in fnmatch.filter(filenames, 'mfsa*.*'):
yield os.path.join(root, filename)
for filename in glob('{}/*.yml'.format(HOF_DIR)):
yield filename
def check_hof_data(data):
if 'names' not in data:
return 'Missing required key: names'
@ -180,61 +123,6 @@ def check_file(file_name):
return None
def parse_md_front_matter(lines):
"""Return the YAML and MD sections.
:param: lines iterator
:return: str YAML, str Markdown
"""
# fm_count: 0: init, 1: in YAML, 2: in Markdown
fm_count = 0
yaml_lines = []
md_lines = []
for line in lines:
# first line we care about is FM start
if fm_count < 2 and line.strip() == '---':
fm_count += 1
continue
if fm_count == 1:
yaml_lines.append(line)
if fm_count == 2:
md_lines.append(line)
if fm_count < 2:
raise ValueError('Front Matter not found.')
return ''.join(yaml_lines), ''.join(md_lines)
def parse_yml_file(file_name):
"""Return the YAML data for file_name."""
with codecs.open(file_name, encoding='utf8') as fh:
data = yaml.safe_load(fh)
if 'mfsa_id' not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data['mfsa_id'] = mfsa_id
return data
def parse_md_file(file_name):
"""Return the YAML and MD sections for file_name."""
with codecs.open(file_name, encoding='utf8') as fh:
yamltext, mdtext = parse_md_front_matter(fh)
data = yaml.safe_load(yamltext)
if 'mfsa_id' not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data['mfsa_id'] = mfsa_id
# run it through parser in case of exception
markdown(mdtext)
return data
def main():
parser = argparse.ArgumentParser(description='Check the syntax of advisory files.')
parser.add_argument('--all', action='store_true',

Просмотреть файл

@ -0,0 +1,320 @@
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import codecs
import fnmatch
import os
import re
from glob import glob
from subprocess import check_output
from dataclasses import dataclass, field
import yaml
from markdown import markdown
GIT = os.getenv("GIT_BIN", "git")
ADVISORIES_DIR = "announce"
HOF_DIR = "bug-bounty-hof"
MFSA_FILENAME_RE = re.compile("mfsa(\d{4}-\d{2,3})\.(md|yml)$")
HOF_FILENAME_RE = re.compile("bug-bounty-hof/\w+\.yml$")
HTML_BR_TAG_RE = re.compile(r"<br */?>")
HTML_CODE_TAG_RE = re.compile(r"</?code>")
HTML_TAG_RE = re.compile(r"<[^>]+>")
HTML_NEWLINE_RE = re.compile(r"\n")
HTML_DOUBLE_NEWLINE_RE = re.compile(r"\n\n")
def mfsa_id_from_filename(filename):
match = MFSA_FILENAME_RE.search(filename)
if match:
return match.group(1)
return None
def git_diff(staged):
"""
Return the modified files in the repo.
:param staged: boolean return only those changes staged in git
:return: list modified file names.
"""
command = [GIT, "diff", "--name-only"]
if staged:
command.append("--cached")
git_out = check_output(command, universal_newlines=True).split()
return [
fn
for fn in git_out
if MFSA_FILENAME_RE.search(fn) or HOF_FILENAME_RE.search(fn)
]
def get_modified_files(staged_only):
"""
Return the modified file names in the repo.
:param staged_only: boolean include all changes or only staged.
:return: list modified file names.
"""
staged_files = git_diff(staged=True)
if staged_only:
return staged_files
modified_files = set(staged_files)
modified_files.update(git_diff(staged=False))
return list(modified_files)
def get_all_files():
"""
Return all advisory file names in the repo.
:return: generator of file names.
"""
for root, dirnames, filenames in os.walk(ADVISORIES_DIR):
for filename in fnmatch.filter(filenames, "mfsa*.*"):
yield os.path.join(root, filename)
for filename in glob("{}/*.yml".format(HOF_DIR)):
yield filename
def parse_md_front_matter(lines):
"""Return the YAML and MD sections.
:param: lines iterator
:return: str YAML, str Markdown
"""
# fm_count: 0: init, 1: in YAML, 2: in Markdown
fm_count = 0
yaml_lines = []
md_lines = []
for line in lines:
# first line we care about is FM start
if fm_count < 2 and line.strip() == "---":
fm_count += 1
continue
if fm_count == 1:
yaml_lines.append(line)
if fm_count == 2:
md_lines.append(line)
if fm_count < 2:
raise ValueError("Front Matter not found.")
return "".join(yaml_lines), "".join(md_lines)
def parse_yml_file(file_name):
"""Return the YAML data for file_name."""
with codecs.open(file_name, encoding="utf8") as fh:
data = yaml.safe_load(fh)
if "mfsa_id" not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data["mfsa_id"] = mfsa_id
return data
def parse_md_file(file_name):
"""Return the YAML and MD sections for file_name."""
with codecs.open(file_name, encoding="utf8") as fh:
yamltext, mdtext = parse_md_front_matter(fh)
data = yaml.safe_load(yamltext)
if "mfsa_id" not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data["mfsa_id"] = mfsa_id
# run it through parser in case of exception
markdown(mdtext)
return data
def remove_newlines(content: str | None):
"""Removes markdown-style newlines. Replaces '\\n\\n' with '<br />' and '\\n' with a space ' '."""
if not content:
return None
content = HTML_DOUBLE_NEWLINE_RE.sub("<br />", content)
content = HTML_NEWLINE_RE.sub(" ", content)
return content
def remove_html_tags(content: str | None):
"""Executes `remove_newlines` and replaces <br> tags with '\\n', <code> tags with '`' and removes all other tags."""
if not content:
return None
content = remove_newlines(content)
content = HTML_BR_TAG_RE.sub("\n", content)
content = HTML_CODE_TAG_RE.sub("`", content)
content = HTML_TAG_RE.sub("", content)
return content
def comma_separated(sequence: list[str], conjunction="and"):
"""
Returns the given string list comma separated. For example: \n
["a","b","c","d"] -> "a, b, c, and d" \n
["a","b"] -> "a and b" \n
["a"] -> "a"
"""
if len(sequence) > 2:
return f"{', '.join(sequence[:-1])}, {conjunction} {sequence[-1]}"
elif len(sequence) == 2:
return f"{sequence[0]} {conjunction} {sequence[-1]}"
else:
return sequence[0]
@dataclass
class CVEAdvisory:
"""A collection of `CVEAdvisoryInstance`s with the same CVE-ID."""
id: str
year: int
instances: list["CVEAdvisoryInstance"] = field(default_factory=list)
@property
def newest_instance(self):
"""
Returns the last modified instance of this CVE advisory (determined by git commit time).
Useful for when only one of the instances is being updated with the latest information.
"""
greatest_last_modified = 0
newest_instance: CVEAdvisoryInstance = None
for instance in self.instances:
if instance.file_last_modified > greatest_last_modified:
greatest_last_modified = instance.file_last_modified
newest_instance = instance
return newest_instance
@property
def full_description(self):
return (
self.newest_instance.description.strip()
+ " This vulnerability affects "
+ comma_separated(
[
f"{instance.product} < {instance.version_fixed}"
for instance in self.instances
],
)
+ "."
)
def to_json_5_0(self):
"""
Convert advisory in yml format into
[CVE JSON 5.0](https://cveproject.github.io/cve-schema/schema/v5.0/docs) format.
"""
return {
"containers": {
"cna": {
"affected": [
{
"product": instance.product,
"vendor": "Mozilla",
"versions": [
{
"lessThan": instance.version_fixed,
"status": "affected",
"version": "unspecified",
"versionType": "custom",
}
],
}
for instance in self.instances
],
"descriptions": [
{
"lang": "en",
"value": remove_html_tags(self.full_description),
"supportingMedia": [
{
"type": "text/html",
"base64": False,
"value": remove_newlines(self.full_description),
}
],
}
],
**(
{
"problemTypes": [
{
"descriptions": [
{
"description": remove_html_tags(
self.newest_instance.title
),
"lang": "en",
"type": "text",
}
]
}
]
}
if self.newest_instance.title
else {}
),
"references": [
{
"url": f"https://www.mozilla.org/security/advisories/mfsa{mfsa_id}/"
}
for mfsa_id in sorted(
set([instance.mfsa_id for instance in self.instances])
)
]
+ [
{
"url": url,
**({"name": desc} if desc else {}),
}
for url, desc in self.newest_instance.references
],
**(
{
"credits": [
{
"lang": "en",
"value": remove_html_tags(
self.newest_instance.reporter
),
}
],
}
if self.newest_instance.reporter
else {}
),
}
},
"dataType": "CVE_RECORD",
"dataVersion": "5.0",
}
@dataclass
class CVEAdvisoryInstance:
"""
A manifestation of a CVE advisory in this repository.
Objects of this class correspond to the entries in the
`advisories:` section of the advisory YAML format.
"""
parent: CVEAdvisory
title: str
description: str
reporter: str | None
references: list[(str, str | None)]
mfsa_id: str
product: str
version_fixed: str
file_name: str
file_last_modified: int

Просмотреть файл

@ -0,0 +1,342 @@
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import sys
import subprocess
from datetime import datetime, timezone
from json import dumps
import difflib
from bisect import insort
from cvelib.cve_api import CveApi
from requests import HTTPError
from foundation_security_advisories.common import (
get_all_files,
parse_yml_file,
CVEAdvisory,
CVEAdvisoryInstance,
)
cve_api = CveApi(
username=os.getenv("CVE_USER"),
org=os.getenv("CVE_ORG"),
api_key=os.getenv("CVE_API_KEY"),
env=os.getenv("CVE_ENV"),
)
announced_cve_steps: list[str] = []
def print_cve_step(cve_id: str):
if cve_id not in announced_cve_steps:
print(f"\n-> {cve_id}")
announced_cve_steps.append(cve_id)
def publish_cve(cve_id: str, cve_json: dict):
"""
CVE Services: Publish the content for a already existing and given
CVE-ID with the given data in CVE JSON 5.0 format.
"""
cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
diff = difflib.unified_diff(
"",
dumps(cve_json, indent=2, sort_keys=True).split("\n"),
lineterm="",
fromfile=f"Remote (not yet published)",
tofile=f"Local",
)
for line in diff:
print(line)
if not prompt_yes_no(f"\nShould this content be published for {cve_id}?"):
print(f"Skipping {cve_id}")
return False
print(f"Publishing {cve_id}")
try:
cve_api.publish(cve_id, cve_json)
# The timestamp on the API needs to be younger than the commit timestamp so that
# the file does not get registered as modified.
touch_cve_id(cve_id)
except HTTPError as e:
raise Exception(f"Failed to publish {cve_id}, {e.response.text}")
def get_cve(cve_id: str):
"""CVE Services: Get CVE for the given CVE-ID."""
try:
return cve_api.show_cve_record(cve_id)
except HTTPError as e:
raise Exception(f"Failed to publish {cve_id}, {e.response.text}")
def touch_cve_id(cve_id: str):
"""CVE Services: Update the timestamp of the given CVE-ID to the current date."""
print(
f"Updating timestamp on {cve_id} to current date {pretty_date(datetime.now(tz=timezone.utc).timestamp())}"
)
return cve_api._put(f"cve-id/{cve_id}").json()
def update_published_cve(cve_id: str, cve_json: dict):
"""CVE Servies: Update the content of the given CVE-ID with the given data in CVE JSON 5.0 format."""
print(f"Updating {cve_id}")
try:
cve_api.update_published(cve_id, cve_json)
# We need to update the timestamp on the CVE-ID itself, because that is what we use
# later to check for modified files.
touch_cve_id(cve_id)
except HTTPError as e:
raise Exception(f"Failed to update {cve_id}, {e.response.text}")
def try_update_published_cve(local_cve: CVEAdvisory, local_date: int, remote_date):
"""
Check if there is a difference between the local and the remote CVE.
If there is one, update the CVE.
"""
remote_date_str = pretty_date(remote_date)
local_date_str = pretty_date(local_date)
if remote_date > local_date and not os.getenv("FORCE_UPDATE"):
return
print_cve_step(local_cve.id)
# We need to modify the remote and local json a bit to make sure we only
# detect a diff if something actually changed.
remote_cve_json = get_cve(local_cve.id)
remote_cve_json.pop("cveMetadata")
remote_cve_json_container = remote_cve_json["containers"]["cna"]
remote_cve_json_container.pop("providerMetadata")
if "x_legacyV4Record" in remote_cve_json_container:
remote_cve_json_container.pop("x_legacyV4Record")
local_cve_json = local_cve.to_json_5_0()
# If there are references which we did not add automatically, we probably don't
# want to remove them, so we move them to our to-be-published object.
remote_extra_references = list(
filter(
lambda reference: all(
not reference["url"].startswith(prefix)
for prefix in [
"https://bugzilla.mozilla.org",
"https://www.bugzilla.mozilla.org",
"https://mozilla.org",
"https://www.mozilla.org",
]
),
remote_cve_json["containers"]["cna"]["references"],
)
)
local_cve_json["containers"]["cna"]["references"].extend(remote_extra_references)
# Sort the references to make sure we detect the diff correctly.
remote_cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
local_cve_json["containers"]["cna"]["references"].sort(
key=lambda reference: reference["url"]
)
diff = difflib.unified_diff(
dumps(remote_cve_json, indent=2, sort_keys=True).split("\n"),
dumps(local_cve_json, indent=2, sort_keys=True).split("\n"),
lineterm="",
fromfile=f"Remote",
fromfiledate=remote_date_str,
tofile=f"Local ",
tofiledate=local_date_str,
)
is_unchanged = True
for line in diff:
print(line)
is_unchanged = False
if is_unchanged:
# There seems to be no actual difference, lets update the
# timestamp so that we won't be here again next time.
print(f"--- Remote\t{remote_date_str}")
print(f"+++ Local \t{local_date_str}")
print(f"Not actual difference found for {local_cve.id}")
touch_cve_id(local_cve.id)
return
if local_cve.year < 2023:
if not prompt_yes_no(
f"\nThis CVE lies before the cutoff year 2023. Should the content still be updated for {local_cve.id}?",
default=True, # CHANGEME
):
print(f"Skipping {local_cve.id} because it lies before the cutoff year")
return False
else:
if not prompt_yes_no(f"\nShould this content be updated for {local_cve.id}?"):
print(f"Skipping {local_cve.id}")
return False
update_published_cve(local_cve.id, local_cve_json)
def reserve_cve_id(year: str):
"""CVE Servies: Reserve a new CVE-ID for a given year and return that new id."""
print(f"Reserving CVE-ID for year {year}")
try:
response = cve_api.reserve(1, False, year)
except HTTPError as e:
raise Exception(f"Failed to reserve CVE-ID, {e.response.text}")
if (
"cve_ids" not in response
or len(response["cve_ids"]) != 1
or "cve_id" not in response["cve_ids"][0]
):
raise ValueError(f"API did not respond with valid CVE-ID")
return response["cve_ids"][0]["cve_id"]
def get_owned_cve_ids():
"""
CVE-Services: Get all the CVE-IDs owned by the current CNA. Returns a tuple containing:
- A list of all the owned IDs, regardless of their state
- A dictionary of all the IDs with the state `PUBLISHED`, mapped to the time
they were last modified.
"""
published_dates: dict[str, float] = {}
owned_ids = []
print("-> Fetching already owned CVE-IDs")
for cve_advisory in cve_api.list_cves():
cve_id = cve_advisory["cve_id"]
owned_ids.append(cve_id)
if cve_advisory["state"] == "PUBLISHED" or cve_advisory["state"] == "REJECTED":
published_dates[cve_id] = parse_iso_date(cve_advisory["time"]["modified"])
elif cve_advisory["state"] == "RESERVED":
continue
else:
raise ValueError(f"Invalid CVE state '{cve_advisory['state']}'")
return owned_ids, published_dates
def replace_cve_id(cve: CVEAdvisory):
"""
Replace the id of a given `CVEAdvisory` with a new CVE-ID.
Returns True if the id of the given advisory has been changed and False
if it hasn't.
"""
old_id = cve.id
if not prompt_yes_no(f"Should a new CVE-ID be reserved to replace {old_id}?"):
print(f"Skipping {old_id}")
return False
print(f"Replacing CVE-ID for {old_id}")
new_id = reserve_cve_id(cve.year)
print(f"Reserved {new_id}")
cve.id = new_id
for instance in cve.instances:
with open(instance.file_name) as r:
file_content = r.read().replace(old_id, new_id)
with open(instance.file_name, "w") as w:
w.write(file_content)
if os.getenv("CI"):
subprocess.run(["git", "add", instance.file_name])
print(f"Renamed {old_id} to {new_id}")
return True
def parse_iso_date(date_string: str):
"""
Parse the given date string in the format used by CVE Servies
and return the corresponding date as a unix timestamp.
"""
return datetime.fromisoformat(date_string).timestamp()
def pretty_date(utc_timestamp: str):
"""
Return the given Unix UTC timestamp as a string in the following format:
%Y-%m-%d %H:%M:%S UTC
"""
return datetime.fromtimestamp(utc_timestamp, timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
def parse_bug(bug: dict):
"""
Parse a single given bug from the advisory YAML, and return the
corresponding URL and (optionally) description.
"""
url = str(bug["url"])
desc = str(bug["desc"]) if "desc" in bug else None
if not url.startswith("http"):
if "," in url:
url = "https://bugzilla.mozilla.org/buglist.cgi?bug_id=" + url.replace(
" ", ""
).replace(",", "%2C")
else:
url = "https://bugzilla.mozilla.org/show_bug.cgi?id=" + url
return url, desc
def prompt_yes_no(question: str, default=True):
if os.getenv("CI") or os.getenv("PROMPT_CHOOSE_DEFAULT"):
return default
try:
response = input(question + (" (Y/n)" if default else " (y/N)"))
except KeyboardInterrupt:
exit(0)
return response.strip().lower() in (["", "y", "yes"] if default else ["y", "yes"])
def get_local_cve_advisories():
"""
Get all the CVE advisories located in this repository as `CVEAdvisory`
objects. Returns a dictionary of all the local CVE-IDs mapped to
their respective `CVEAdvisory` objects.
"""
local_advisories: dict[str, CVEAdvisory] = {}
print("\n-> Checking local files")
for file_name in get_all_files():
if not file_name.endswith(".yml"):
continue
file_data: dict = parse_yml_file(file_name)
file_last_modified = int(
subprocess.run(
[
"git",
"log",
"--pretty=format:%at",
"-1",
"HEAD",
"--",
file_name,
],
capture_output=True,
).stdout.strip()
)
if "advisories" in file_data:
for cve_id in file_data["advisories"]:
cve_data = file_data["advisories"][cve_id]
if cve_id not in local_advisories:
year = int(cve_id.split("-")[-2])
local_advisories[cve_id] = CVEAdvisory(id=cve_id, year=year)
for fixed_in in file_data["fixed_in"]:
product, version_fixed = fixed_in.rsplit(None, 1)
references = [parse_bug(bug) for bug in cve_data["bugs"]]
cve_instance = CVEAdvisoryInstance(
parent=local_advisories[cve_id],
title=cve_data["title"],
description=cve_data["description"].strip(),
reporter=cve_data["reporter"],
references=references,
mfsa_id=file_data["mfsa_id"],
product=product,
version_fixed=version_fixed,
file_name=file_name,
file_last_modified=file_last_modified,
)
# We want the instances to be sorted by the msfa id to avoid pushing updates
# to the API where the only thing that changes is the order of the instances.
insort(
local_advisories[cve_id].instances,
cve_instance,
key=lambda x: x.mfsa_id,
)
return local_advisories

Просмотреть файл

@ -0,0 +1,59 @@
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import sys
import subprocess
from foundation_security_advisories.common import (
CVEAdvisory,
)
from foundation_security_advisories.common_cve import *
def main():
owned_cve_ids, published_cve_id_dates = get_owned_cve_ids()
local_cve_advisories: dict[str, CVEAdvisory] = get_local_cve_advisories()
for cve_id in local_cve_advisories:
cve_advisory = local_cve_advisories[cve_id]
if cve_id.startswith("MFSA-RESERVE"):
print_cve_step(cve_id)
if not replace_cve_id(cve_advisory):
continue
cve_id = cve_advisory.id
owned_cve_ids.append(cve_id)
if cve_id not in owned_cve_ids:
# if cve_id.startswith("CVE"):
# print_cve_step(cve_id)
# print(f"Warning: Skipping {cve_id} because we do not own it")
continue
if cve_id not in published_cve_id_dates:
print_cve_step(cve_id)
publish_cve(cve_advisory.id, cve_advisory.to_json_5_0())
else:
try_update_published_cve(
local_cve=cve_advisory,
local_date=cve_advisory.newest_instance.file_last_modified,
remote_date=published_cve_id_dates[cve_id],
)
if os.getenv("CI"):
subprocess.run(
[
"git",
"commit",
"-m",
f"Assign CVE ids",
]
)
subprocess.run(["git", "push"])
if __name__ == "__main__":
sys.exit(main())

Просмотреть файл

@ -1,5 +1,6 @@
PyYAML==5.4
PyYAML==6.0.1
Markdown==3.3.7
python-dateutil==2.4.2
schema==0.7.2
cvelib==1.2.1
requests

Просмотреть файл

@ -27,15 +27,18 @@ setup(
],
packages=find_packages(exclude=['contrib', 'docs', 'tests']),
install_requires=[
'PyYAML==5.4',
'PyYAML==6.0.1',
'Markdown',
'python-dateutil==2.8.2',
'schema==0.7.2',
'cvelib==1.2.1',
'requests'],
entry_points={
"console_scripts": [
"update_hof = foundation_security_advisories.update_hof:main",
"check_advisories = foundation_security_advisories.check_advisories:main"
"check_advisories = foundation_security_advisories.check_advisories:main",
"publish_cve_advisories = foundation_security_advisories.publish_cve_advisories:main",
"assign_cve_ids = foundation_security_advisories.assign_cve_ids:main",
]
}
)