Add query generation capability for events_daily
This is a straightforward way to share queries between datasets.
This commit is contained in:
Родитель
67b8b9e350
Коммит
f675ccf533
|
@ -130,6 +130,17 @@ jobs:
|
|||
- run:
|
||||
name: Verify that DAGs were correctly generated and are up-to-date
|
||||
command: git diff --exit-code
|
||||
verify-queries-up-to-date:
|
||||
docker: *docker
|
||||
steps:
|
||||
- checkout
|
||||
- *build
|
||||
- run:
|
||||
name: Generate Queries
|
||||
command: PATH="venv/bin:$PATH" script/generate_events_daily_queries
|
||||
- run:
|
||||
name: Verify that queries were correctly generated and are up-to-date
|
||||
command: git diff --exit-code
|
||||
validate-docs:
|
||||
docker: *docker
|
||||
steps:
|
||||
|
|
|
@ -6,3 +6,4 @@ rules:
|
|||
|
||||
ignore: |
|
||||
venv/
|
||||
bigquery_etl/events_daily/
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
"""Generate query directories."""
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from bigquery_etl.format_sql.formatter import reformat
|
||||
from dataclasses import dataclass
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
QUERY_FILES = {
|
||||
"init.sql",
|
||||
"metadata.yaml",
|
||||
"query.sql",
|
||||
"stored_procedure.sql",
|
||||
"udf.sql",
|
||||
"view.sql",
|
||||
}
|
||||
|
||||
|
||||
ALLOWED_FILES = QUERY_FILES | {"templating.yaml"}
|
||||
|
||||
|
||||
BASE_DIR = Path(os.path.dirname(__file__)).parent.parent
|
||||
|
||||
|
||||
@dataclass
|
||||
class Template:
|
||||
"""A template, to be filled with args and saved as a file."""
|
||||
|
||||
name: str
|
||||
env: Environment
|
||||
|
||||
def generate(self, write_path, args):
|
||||
"""Generate this template at the specified write_path with the specified args."""
|
||||
fpath = write_path / self.name
|
||||
print(f"...Generating {str(fpath)}")
|
||||
|
||||
write_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if "header" not in args:
|
||||
args["header"] = "-- Generated by bigquery_etl/generate_queries.py"
|
||||
|
||||
text = self.env.get_template(self.name).render(**args)
|
||||
|
||||
if fpath.suffix == ".sql":
|
||||
text = reformat(text, trailing_newline=True)
|
||||
|
||||
(write_path / self.name).write_text(text)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryDir:
|
||||
"""A directory of templates, which will be filled depending on the templating.yaml."""
|
||||
|
||||
name: str
|
||||
path: Path
|
||||
env: Optional[Environment] = None
|
||||
|
||||
def generate(self, write_path, dataset=None):
|
||||
"""Generate this QueryDir at the specified write_path for the specified dataset."""
|
||||
args = self.get_args()
|
||||
datasets = self.get_datasets(args, dataset)
|
||||
|
||||
for template in self.get_templates():
|
||||
for _dataset in datasets:
|
||||
template.generate(write_path / _dataset / self.name, args[_dataset])
|
||||
|
||||
def get_datasets(self, args, dataset=None) -> List[str]:
|
||||
"""Get datasets to process."""
|
||||
datasets = list(args.keys())
|
||||
if dataset is not None:
|
||||
datasets = [d for d in datasets if d == dataset]
|
||||
if not datasets:
|
||||
raise Exception("Nothing to generate, no datasets found for " + self.name)
|
||||
return datasets
|
||||
|
||||
def get_templates(self) -> List[Template]:
|
||||
"""Get the names of the templates to process."""
|
||||
env = self.get_environment()
|
||||
return [
|
||||
Template(f.name, env)
|
||||
for f in self.path.glob("*")
|
||||
if str(f.name) in QUERY_FILES
|
||||
]
|
||||
|
||||
def get_environment(self) -> Environment:
|
||||
"""Get the environment."""
|
||||
if self.env is None:
|
||||
self.env = Environment(loader=FileSystemLoader(str(self.path)))
|
||||
return self.env
|
||||
|
||||
def get_args(self) -> dict:
|
||||
"""Get all arguments for templating, per-dataset."""
|
||||
with open(self.path / "templating.yaml", "r") as f:
|
||||
return yaml.load(f) or {}
|
||||
|
||||
|
||||
def get_query_dirs(path):
|
||||
"""Walk a path to get all templated query dirs."""
|
||||
for directory, sub_dirs, files in os.walk(path):
|
||||
non_hidden = {f for f in files if not f.startswith(".")}
|
||||
if non_hidden and non_hidden.issubset(ALLOWED_FILES):
|
||||
dir_path = Path(directory)
|
||||
yield QueryDir(dir_path.name, dir_path)
|
||||
|
||||
|
||||
def generate_queries(project, path, dataset, write_dir):
|
||||
"""Generate queries at the path for project."""
|
||||
write_path = write_dir / project
|
||||
for query_dir in get_query_dirs(path):
|
||||
query_dir.generate(write_path, dataset)
|
||||
|
||||
|
||||
def main():
|
||||
"""Generate Query directories."""
|
||||
parser = ArgumentParser(description=main.__doc__)
|
||||
parser.add_argument(
|
||||
"--project",
|
||||
help="Which project the queries should be written to.",
|
||||
default="moz-fx-data-shared-prod",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--path",
|
||||
help="Where query directories will be searched for.",
|
||||
default="bigquery_etl/events_daily/query_templates",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dataset",
|
||||
help=(
|
||||
"The dataset to run this for. "
|
||||
"If none selected, runs on all in the configuration yaml file."
|
||||
),
|
||||
default=None,
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--write-dir",
|
||||
help="The location to write to. Defaults to sql/.",
|
||||
default=BASE_DIR / "sql",
|
||||
required=False,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
generate_queries(args.project, args.path, args.dataset, args.write_dir)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,7 @@
|
|||
#!/bin/sh
|
||||
|
||||
# Generate Queries.
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
python3 -m bigquery_etl.events_daily.generate_queries "$@"
|
|
@ -0,0 +1,12 @@
|
|||
import os
|
||||
|
||||
from bigquery_etl.events_daily.generate_queries import get_query_dirs, QueryDir
|
||||
from pathlib import Path
|
||||
|
||||
BASE_DIR = Path(os.path.dirname(__file__)).parent
|
||||
|
||||
|
||||
class TestGenerateQueries:
|
||||
def test_get_query_dirs(self):
|
||||
res = list(get_query_dirs(BASE_DIR / "templates"))
|
||||
assert res == [QueryDir("event_types", BASE_DIR / "templates" / "event_types")]
|
|
@ -0,0 +1,36 @@
|
|||
import os
|
||||
import pytest
|
||||
|
||||
from bigquery_etl.events_daily.generate_queries import QueryDir, Template
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
BASE_DIR = Path(os.path.dirname(__file__)).parent
|
||||
|
||||
|
||||
class TestQueryDir:
|
||||
@pytest.fixture
|
||||
def query_dir(self):
|
||||
return QueryDir("event_types", Path(BASE_DIR / "templates" / "event_types"))
|
||||
|
||||
def test_get_datasets(self, query_dir):
|
||||
assert query_dir.get_datasets(query_dir.get_args()) == [
|
||||
"dataset-1",
|
||||
"dataset-2",
|
||||
]
|
||||
|
||||
def test_get_datasets_with_arg(self, query_dir):
|
||||
assert query_dir.get_datasets(query_dir.get_args(), "dataset-2") == [
|
||||
"dataset-2"
|
||||
]
|
||||
|
||||
def test_get_templates(self, query_dir):
|
||||
assert query_dir.get_templates() == [
|
||||
Template("query.sql", query_dir.get_environment())
|
||||
]
|
||||
|
||||
def test_get_args(self, query_dir):
|
||||
assert query_dir.get_args() == {
|
||||
"dataset-1": {"key": "val1"},
|
||||
"dataset-2": {"key": "val2"},
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
{ {key } }
|
|
@ -0,0 +1,4 @@
|
|||
dataset-1:
|
||||
key: val1
|
||||
dataset-2:
|
||||
key: val2
|
Загрузка…
Ссылка в новой задаче