Post changes in generated BigQuery schema to comments (#525)

* Add initial script for BigQuery CI

* Generate bq schemas for head and base revisions

* Refactor main routine into checkout_transpile_schema

* Add diff between versions of bigquery schemas

* Add test for checkout transpile schemas

* Add tests for the diff

* Remove hindsight and add jsonschema-transpiler

* Add script to run a shell with current directory mounted

* Run bigquery script and persist to ci workspace

* Enable docker_caching_layer

* Add an upstream branch to the ci checkout

* Add options for specifying head and base ref

* Checkout the reference instead of revision when restoring state

* Use upstream/master as the base reference for ci

* Use the correct app name when copying back to circleci

* Specify relative path to folder with results

* Add dependency to test for posting artifacts

* Update post-artifact for BigQuery diff

* Move section referring to head/base revisions into validation report

* Update post-artifact to expand if diff is present

* Use contextmanager for managing current git state

* Add docstrings

* Add tests for the bigquery script to ci

* Use a tagged version of jsonschema-transpiler

* Fix test due to lack of stashing in nested context

* Update scripts/bigquery.py

Co-Authored-By: Frank Bertsch <fbertsch@mozilla.com>

* Update .circleci/post-artifact.js

Co-Authored-By: Anna Scholtz <anna@scholtzan.net>

* Rename bigquery to bigquery_schema_diff

* Update README with instructions on runnning bigquery_schema_diff

* Assert failure if untracked files in schema directories

* Use find instead of filter in post-artifact

Co-authored-by: Frank Bertsch <fbertsch@mozilla.com>
Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Anthony Miyaguchi 2020-04-24 12:26:19 -07:00 коммит произвёл GitHub
Родитель ff3ed926f0
Коммит cfd06fee6d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 543 добавлений и 46 удалений

Просмотреть файл

@ -15,7 +15,13 @@ jobs:
- image: docker:stable-git
steps:
- checkout
- setup_remote_docker
- run: &checkout_upstream
name: Checkout upstream
command: |
git remote add upstream git@github.com:mozilla-services/mozilla-pipeline-schemas.git
git fetch --all
- setup_remote_docker:
docker_layer_caching: true
- run:
name: Build Docker image
command: docker build -t mps .
@ -28,15 +34,26 @@ jobs:
- run:
name: Verify set version in telemetry namespace is correct
command: docker run -w /app mps scripts/assert-telemetry-version
- run:
name: Test BigQuery schema generation for validation
command:
docker run -w /app mps pytest scripts/bigquery_schema_diff.py
- run:
name: Generate diff artifacts for bigquery
command: |
docker run --name mps -w /app mps scripts/bigquery_schema_diff.py \
--base-ref upstream/master
docker cp mps:/app/integration /tmp/integration
- persist_to_workspace:
root: /tmp
paths:
- integration
integrate:
executor: edge-validator
steps:
- checkout
- run: &checkout_upstream
name: Checkout upstream
command: |
git remote add upstream git@github.com:mozilla-services/mozilla-pipeline-schemas.git
git fetch --all
- run:
<<: *checkout_upstream
- setup_remote_docker
- restore_cache:
key: mps-integration-data-v2
@ -111,6 +128,7 @@ workflows:
ignore: /pull\/[0-9]+/
- post-artifacts:
requires:
- test
- integrate
filters:
branches:

Просмотреть файл

@ -7,35 +7,71 @@
const fs = require("fs");
const bot = require("circle-github-bot").create();
let root = "/tmp/test-reports";
let files = fs.readdirSync(root);
console.log(files);
function validation_report() {
let root = "/tmp/test-reports";
let files = fs.readdirSync(root);
console.log(files);
// An example listing of files. The diff is created by comparing differences
// between the upstream commit (mozilla-pipeline-schemas/master) and the report
// generated from the PR
//
// ["723350e.report.json", "c88ebe5.report.json", "c88ebe5-723350e.diff"]
// An example listing of files. The diff is created by comparing differences
// between the upstream commit (mozilla-pipeline-schemas/master) and the report
// generated from the PR
//
// ["723350e.report.json", "c88ebe5.report.json", "c88ebe5-723350e.diff"]
let diff_file = files.filter(x => x.endsWith(".diff"))[0];
let diff_content = fs.readFileSync(root + "/" + diff_file, "utf8");
let [upstream, head] = diff_file.split(".")[0].split("-");
let diff_file = files.find(x => x.endsWith(".diff"));
let diff_content = fs.readFileSync(root + "/" + diff_file, "utf8");
let [upstream, head] = diff_file.split(".")[0].split("-");
var body = "No content detected.";
if (diff_content) {
body = `<details>
<summary>Click to expand!</summary>
// Generate and post markdown
var content = `#### \`${diff_file}\`
\`\`\`diff
${diff_content}
\`\`\`
</details>
`;
}
if (!diff_content) {
content = "No changes detected.";
// Generate and post markdown
let content = `
[Report for upstream](${bot.env.buildUrl}/artifacts/0/app/test-reports/${upstream}.report.json)
[Report for latest commit](${bot.env.buildUrl}/artifacts/0/app/test-reports/${head}.report.json)
#### \`${diff_file}\`
${body}
`;
return content;
}
function bigquery_diff() {
let root = "/tmp/integration";
let files = fs.readdirSync(root);
console.log(files);
let diff_file = files.find(x => x.endsWith(".diff"));
let diff_content = fs.readFileSync(root + "/" + diff_file, "utf8");
var body = "No content detected."
if (diff_content) {
body = `<details>
<summary>Click to expand!</summary>
\`\`\`diff
${diff_content}
\`\`\`
</details>
`
}
var content = `#### \`${diff_file}\`
${body}
`;
return content;
}
bot.comment(process.env.GH_AUTH_TOKEN, `
### Integration report for "${bot.env.commitMessage}"
[Report for upstream](${bot.env.buildUrl}/artifacts/0/app/test-reports/${upstream}.report.json)
[Report for latest commit](${bot.env.buildUrl}/artifacts/0/app/test-reports/${head}.report.json)
${validation_report()}
${content}
${bigquery_diff()}
`);

Просмотреть файл

@ -16,24 +16,14 @@ RUN dnf -y update && \
python36 \
java-1.8.0-openjdk-devel \
maven \
cargo \
&& dnf clean all
WORKDIR /downloads
# Install hindsight and the luasandbox
RUN wget -qO- https://s3-us-west-2.amazonaws.com/net-mozaws-data-us-west-2-ops-ci-artifacts/mozilla-services/lua_sandbox_extensions/master/centos7/all.tgz | tar xvz
RUN wget https://s3-us-west-2.amazonaws.com/net-mozaws-data-us-west-2-ops-ci-artifacts/mozilla-services/lua_sandbox_extensions/external/centos7/parquet-cpp-1.3.1-1.x86_64.rpm
RUN dnf -y install \
hindsight-0* \
luasandbox-1* \
luasandbox-cjson* \
luasandbox-lfs* \
luasandbox-lpeg* \
luasandbox-parquet* \
luasandbox-rjson* \
parquet-cpp* \
&& dnf clean all
ENV PATH=$PATH:/root/.cargo/bin
RUN cargo install jsonschema-transpiler --version 1.8.0
# Configure git for testing
RUN git config --global user.email "mozilla-pipeline-schemas@mozilla.com"
RUN git config --global user.name "Mozilla Pipeline Schemas"
WORKDIR /app
COPY . /app

Просмотреть файл

@ -65,14 +65,11 @@ in the form `<ping type>.<version>.<test name>.pass.json` for documents expected
`<ping type>.<version>.<test name>.fail.json` for documents expected to fail validation.
The `test name` should match the pattern `[0-9a-zA-Z_]+`
To run the tests:
To run the tests, make use of the wrapper scripts:
```bash
# build the container with the pipeline schemas
docker build -t mps .
# run the tests
docker run --rm mps
./scripts/mps-build
./scripts/mps-test
```
### Packaging and integration tests (optional)
@ -98,6 +95,35 @@ pytest -k telemetry/main.4
pytest -k java
```
To generate a diff of BigQuery schemas, run the `bigquery_schema_diff.py` script.
```bash
# optionally, enter the mozilla-pipeline-schemas environment
# for jsonschema-transpiler and python3 dependencies
./script/mps-shell
# generate an integration folder, the options will default to HEAD and master
# respectively
./script/bigquery_schema_diff.py --base-ref master --head-ref HEAD
```
This generates an `integration` folder:
```bash
integration
├── bq_schema_f59ca95-d502688.diff
├── d502688
│ ├── activity-stream.events.1.bq
│ ├── activity-stream.impression-stats.1.bq
...
│ └── webpagetest.webpagetest-run.1.bq
└── f59ca95
├── activity-stream.events.1.bq
├── activity-stream.impression-stats.1.bq
...
└── webpagetest.webpagetest-run.1.bq
```
Pushes to the main repo will trigger integration tests in CircleCI that directly
compare the revision to the `master` branch. These tests do not run for forked PRs
in order to protect data and credentials, but reviewers can trigger tests to run

415
scripts/bigquery_schema_diff.py Executable file
Просмотреть файл

@ -0,0 +1,415 @@
#!/usr/bin/env python3
"""Test the differences in BigQuery schemas between two revisions in schema repository history. This is
for diagnostic purposes while writing schemas.
"""
import json
import os
import shutil
import subprocess
import tempfile
import argparse
from contextlib import contextmanager
from pathlib import Path
from typing import List, Tuple, Union
import pytest
ROOT = Path(__file__).parent.parent
def run(command: Union[str, List[str]], **kwargs) -> str:
"""Simple wrapper around subprocess.run that returns stdout and raises exceptions on errors."""
if isinstance(command, list):
args = command
elif isinstance(command, str):
args = command.split()
else:
raise RuntimeError(f"run command is invalid: {command}")
# TODO: log the output
return (
subprocess.run(args, stdout=subprocess.PIPE, **{**dict(check=True), **kwargs})
.stdout.decode()
.strip()
)
def transpile(schema_path: Path) -> dict:
"""Transpile a JSON Schema into a BigQuery schema."""
res = run(
[
"jsonschema-transpiler",
str(schema_path),
"--normalize-case",
"--resolve",
"cast",
"--type",
"bigquery",
]
)
schema = json.loads(res)
return schema
def transform(document: dict) -> dict:
"""Transform a document for loading into BigQuery."""
# TODO: normalize field names using snake casing
# TODO: additional properties
# TODO: pseudo maps
# TODO: anonymous structs
raise NotImplementedError()
def transpile_schemas(output_path: Path, schema_paths: List[Path]):
"""Write schemas to directory."""
assert output_path.is_dir()
for path in schema_paths:
namespace, doctype, filename = path.parts[-3:]
version = int(filename.split(".")[-3])
# pioneer-study schemas were done incorrectly and are ignored here
if namespace == "schemas":
print(f"skipping {path} due to wrong directory level")
continue
out = output_path / f"{namespace}.{doctype}.{version}.bq"
with out.open("w") as fp:
print(f"writing {out}")
json.dump(transpile(path), fp, indent=2)
fp.write("\n")
def load_schemas(input_path: Path) -> dict:
"""Load schemas into memory for use in google-cloud-bigquery."""
paths = list(input_path.glob("*.bq"))
assert len(paths) > 0
schemas = {}
for path in paths:
qualified_name = path.parts[-1][:-3]
with path.open("r") as fp:
schemas[qualified_name] = json.load(fp)
print(f"loaded {len(schemas.keys())} schemas")
return schemas
def git_stash_size() -> int:
"""Find the size of the git stash."""
return len([item for item in run("git stash list").split("\n") if item])
def git_untracked_files(directories=["schemas", "templates"]) -> List[str]:
"""Return a list of untracked files within specific directories."""
untracked = run(["git", "ls-files", "--others", "--exclude-standard", *directories])
return [item for item in untracked.split("\n") if item]
def resolve_ref(ref: str) -> str:
"""Return a resolved reference or the short revision if empty."""
resolved = run(f"git rev-parse --abbrev-ref {ref}") or run(
f"git rev-parse --short {ref}"
)
if resolved != ref:
print(f"resolved {ref} to {resolved}")
return resolved
@contextmanager
def managed_git_state():
"""Save the current git state.
Stash any changes so we can reference by real changes in the tree. If the
branch has in-flight changes, the changes would be ignored by the stash.
"""
original_ref = run("git rev-parse --abbrev-ref HEAD")
before_stash_size = git_stash_size()
run("git stash")
should_apply_stash = before_stash_size != git_stash_size()
if should_apply_stash:
print(
"NOTE: uncommitted files have been detected. These will be ignored during comparisons."
)
try:
yield
finally:
run(f"git checkout {original_ref}")
if should_apply_stash:
run("git stash apply")
# if apply fails, then an exception is thrown and the stash still
# exists
run("git stash drop")
def _checkout_transpile_schemas(schemas: Path, ref: str, output: Path) -> Path:
"""Checkout a revision, transpile schemas, and return to the original revision.
Generates a new folder under output with the short revision of the reference.
"""
# preconditions
assert output.is_dir(), f"output must be a directory: {output}"
assert (
len(run("git diff")) == 0
), f"current git state must be clean, please stash changes"
assert not git_untracked_files(), (
"unchecked files detected in schema directories, please check them in: "
", ".join(git_untracked_files())
)
rev = run(f"git rev-parse --short {ref}")
print(f"transpiling schemas for ref: {ref}, rev: {rev}")
# directory structure uses the short revision
rev_path = output / rev
rev_path.mkdir()
with managed_git_state():
# checkout and generate schemas
run(f"git checkout {ref}")
transpile_schemas(rev_path, schemas.glob("**/*.schema.json"))
return rev_path
def checkout_transpile_schemas(
schemas: Path, head_ref: str, base_ref: str, outdir: Path
) -> Tuple[Path, Path]:
"""Generate schemas for the head and base revisions of the repository. This will
generate a folder containing the generated BigQuery schemas under the
outdir.
"""
# generate a working path that can be thrown away if errors occur
workdir = Path(tempfile.mkdtemp())
# resolve references (e.g. HEAD) to their branch or tag name if they exist
resolved_head_ref = resolve_ref(head_ref)
resolved_base_ref = resolve_ref(base_ref)
with managed_git_state():
head_rev_path = _checkout_transpile_schemas(schemas, resolved_head_ref, workdir)
base_rev_path = _checkout_transpile_schemas(schemas, resolved_base_ref, workdir)
# copy into the final directory atomically
if not outdir.exists():
outdir.mkdir()
shutil.rmtree(outdir)
shutil.copytree(workdir, outdir)
return outdir / head_rev_path.parts[-1], outdir / base_rev_path.parts[-1]
def write_schema_diff(head: Path, base: Path, output: Path) -> Path:
# passing the revision in the path may not be the most elegant solution
head_rev = head.parts[-1]
base_rev = base.parts[-1]
diff_path = output / f"bq_schema_{base_rev}-{head_rev}.diff"
diff_contents = run(f"diff {base} {head}", check=False)
with diff_path.open("w") as fp:
fp.write(diff_contents)
return diff_path
# TODO: options --use-document-sample
def main():
"""
TODO:
```
create a dataset with the base git revision
f"rev_{base_revision}"
if dataset does not exist:
for each schema:
create a table for each schema in base revision:
f"{namespace}__{doctype}_v{version}"
insert transformed documents from base revision
get the set of modified schemas or validation documents between revisions
for each schema in modified set:
initialize head table with schema of the base table
f"rev_{head_revision}__{namspace}__{doctype}_v{version}"
insert transformed documents from head revision
evolve schema to head revision
insert transformed documents from head revision
generate diff of `SELECT *` from base vs head
generate artifacts for CI
on user consent:
report artifact to structured ingestion
```
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--base-ref", default="master", help="Reference to base commit e.g. master"
)
parser.add_argument(
"--head-ref", default="HEAD", help="Reference to the head commit e.g. HEAD"
)
args = parser.parse_args()
# check that the correct tools are installed
run("jsonschema-transpiler --version")
schemas = ROOT / "schemas"
integration = ROOT / "integration"
head_rev_path, base_rev_path = checkout_transpile_schemas(
schemas, args.head_ref, args.base_ref, integration
)
write_schema_diff(head_rev_path, base_rev_path, integration)
def test_preconditions():
assert (ROOT / "schemas").glob(
"**/*.schema.json"
), "must contain at least one schema"
assert (ROOT / "validation").glob(
"**/*.pass.json"
), "must contain at least one passing validation document"
def test_transpile(tmp_path):
test_schema = {"type": "string"}
expected_schema = [{"mode": "REQUIRED", "name": "root", "type": "STRING"}]
test_schema_path = tmp_path / "test.json"
with test_schema_path.open("w") as fp:
json.dump(test_schema, fp)
assert transpile(test_schema_path) == expected_schema
@pytest.fixture
def tmp_git(tmp_path: Path) -> Path:
"""Copy the entire repository with the current revision.
To check the state of a failed test, change directories to the temporary
directory surfaced by pytest.
"""
curdir = os.getcwd()
origin = ROOT
workdir = tmp_path / "mps"
resolved_head_ref = resolve_ref("HEAD")
run(f"git clone {origin} {workdir}")
os.chdir(workdir)
run(f"git checkout {resolved_head_ref}")
yield workdir
os.chdir(curdir)
def test_dummy_git_env(tmp_git: Path):
assert Path(run("git remote get-url origin")) == ROOT
assert tmp_git != ROOT
def test_git_untracked_files(tmp_git: Path):
assert not git_untracked_files()
# schemas folder is checked by default
run("touch schemas/new_file")
# but not the tests directory
run("touch tests/new_file")
assert git_untracked_files() == ["schemas/new_file"]
assert git_untracked_files(directories=["schemas", "tests"]) == [
"schemas/new_file",
"tests/new_file",
]
def test_managed_git_state(tmp_git: Path):
original = run("git rev-parse HEAD")
with managed_git_state():
run("git checkout HEAD~1")
assert run("git rev-parse HEAD") != original
assert run("git rev-parse HEAD") == original
def test_managed_git_state_stash(tmp_git: Path):
"""Assert that top level stash is maintained when no changes are made during visits of revisions."""
filename = tmp_git / "README.md"
original = run("git rev-parse HEAD")
filename.open("w+").write("test")
diff = run("git diff")
assert len(diff) > 0, run("git status")
assert git_stash_size() == 0
with managed_git_state():
assert git_stash_size() == 1
run("git checkout HEAD~1")
with managed_git_state():
assert git_stash_size() == 1
run("git checkout HEAD~1")
assert git_stash_size() == 0
assert run("git rev-parse HEAD") == original
assert run("git diff") == diff
def test_managed_git_state_stash_with_conflict(tmp_git: Path):
"""Conflicts made during visits are NOT handled, but the stash maintains history."""
filename = tmp_git / "README.md"
original = run("git rev-parse HEAD")
filename.open("w+").write("test")
diff = run("git diff")
assert len(diff) > 0, run("git status")
assert git_stash_size() == 0
with pytest.raises(subprocess.CalledProcessError) as excinfo:
with managed_git_state():
assert git_stash_size() == 1
run("git checkout HEAD~1")
filename.open("w+").write("test1")
assert "apply" in str(excinfo.value)
assert git_stash_size() == 1
def test_checkout_transpile_schemas(tmp_git: Path, tmp_path):
test_schema = {
"type": "object",
"properties": {"first": {"type": "string"}, "second": {"type": "string"}},
}
test_schema_path = tmp_git / "schemas/test-namespace/test/test.1.schema.json"
def add_test_schema() -> Tuple[Path, Path]:
test_schema_path.parent.mkdir(parents=True, exist_ok=False)
with test_schema_path.open("w") as fp:
json.dump(test_schema, fp)
run(f"git add {test_schema_path}")
run(["git", "commit", "-m", "Add a test schema"])
return checkout_transpile_schemas(
tmp_git / "schemas", "HEAD", "HEAD~1", tmp_path / "integration"
)
def add_new_column():
test_schema["properties"]["third"] = {"type": "string"}
with test_schema_path.open("w") as fp:
json.dump(test_schema, fp)
run(f"git add {test_schema_path}")
run(["git", "commit", "-m", "Add a new column"])
return checkout_transpile_schemas(
tmp_git / "schemas", "HEAD", "HEAD~1", tmp_path / "integration"
)
def get_bq_names(path: Path) -> set:
return {p.name for p in path.glob("*.bq")}
head, base = add_test_schema()
assert len(list(base.glob("*.bq"))) > 0
assert get_bq_names(head) - get_bq_names(base) == {
"test-namespace.test.1.bq"
}, "new schema not detected"
diff = write_schema_diff(head, base, tmp_path / "integration").open().readlines()
assert (
len(diff) == 1 and "only in" in diff[0].lower()
), "a single line with addition expected"
head, base = add_new_column()
diff = write_schema_diff(head, base, tmp_path / "integration").open().readlines()
assert all(line[0] != "<" for line in diff), "diff contains removal"
assert any(
(">" in line) and ("third" in line) for line in diff
), "diff does not contain new column"
if __name__ == "__main__":
main()

11
scripts/mps-shell Executable file
Просмотреть файл

@ -0,0 +1,11 @@
#!/bin/bash
# example usage: scripts/mps-shell
set -e
cd "$(dirname "$0")/.."
docker run \
--rm \
--volume "$(pwd)":/app \
-it mozilla-pipeline-schemas:latest \
bash

Просмотреть файл

@ -8,6 +8,7 @@ cd "$(dirname "$0")/.."
# TODO: only mount templates (r), schemas (w), validation (r), tests (r)
# TODO: generate schemas from mounted templates, write to mounted schemas
docker run \
--rm \
--volume "$(pwd)":/app \
--entrypoint pytest \
-it mozilla-pipeline-schemas:latest \