* Support Jinja templating in query files

* Formatting for Jinja

* ./bqetl query render command

* Fix running templates
This commit is contained in:
Anna Scholtz 2023-03-30 11:00:12 -07:00 коммит произвёл GitHub
Родитель 8575644d5e
Коммит 08b45a40fe
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 231 добавлений и 74 удалений

Просмотреть файл

@ -344,6 +344,10 @@ jobs:
PATH="venv/bin:$PATH" script/bqetl generate all \
--output-dir /tmp/workspace/generated-sql/sql/ \
--target-project moz-fx-data-shared-prod
PATH="venv/bin:$PATH" script/bqetl query render \
--sql-dir /tmp/workspace/generated-sql/sql/ \
--output-dir /tmp/workspace/generated-sql/sql/ \
"/tmp/workspace/generated-sql/sql/"
PATH="venv/bin:$PATH" script/bqetl dependency record \
--skip-existing \
"/tmp/workspace/generated-sql/sql/"

Просмотреть файл

@ -53,6 +53,7 @@ from ..schema import SCHEMA_FILE, Schema
from ..util import extract_from_query_path
from ..util.bigquery_id import sql_table_id
from ..util.common import random_str
from ..util.common import render as render_template
from .dryrun import dryrun
from .generate import generate_all
@ -860,7 +861,19 @@ def _run_query(
# point to a public table it needs to be passed as parameter for the query
query_arguments.append("--destination_table={}".format(destination_table))
with open(query_file) as query_stream:
# write rendered query to a temporary file;
# query string cannot be passed directly to bq as SQL comments will be interpreted as CLI arguments
with tempfile.NamedTemporaryFile(mode="w+") as query_stream:
query_stream.write(
render_template(
query_file.name,
template_folder=str(query_file.parent),
templates_dir="",
format=False,
)
)
query_stream.seek(0)
# run the query as shell command so that passed parameters can be used as is
subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)
@ -1186,6 +1199,52 @@ def initialize(name, sql_dir, project_id, dry_run):
job.result()
@query.command(
help="""Render a query Jinja template.
Examples:
./bqetl query render telemetry_derived.ssl_ratios_v1 \\
--output-dir=/tmp
""",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("name")
@sql_dir_option
@click.option(
"--output-dir",
"--output_dir",
help="Output directory generated SQL is written to. "
+ "If not specified, rendered queries are printed to console.",
type=click.Path(file_okay=False),
required=False,
)
def render(name, sql_dir, output_dir):
"""Render a query Jinja template."""
if name is None:
name = "*.*"
query_files = paths_matching_name_pattern(name, sql_dir, None)
for query_file in query_files:
rendered_sql = render_template(
query_file.name, template_folder=query_file.parent, templates_dir=""
)
if output_dir:
sql_dir = Path(sql_dir)
output_file = output_dir / query_file.resolve().relative_to(
sql_dir.resolve()
)
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(rendered_sql)
else:
click.echo(query_file)
click.echo(rendered_sql)
@query.group(help="Commands for managing query schemas.")
def schema():
"""Create the CLI group for the query schema command."""

Просмотреть файл

@ -10,6 +10,7 @@ import click
import yaml
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
from bigquery_etl.util.common import render
stable_views = None
@ -57,7 +58,8 @@ def extract_table_references_without_views(path: Path) -> Iterator[str]:
"""Recursively search for non-view tables referenced in the given SQL file."""
global stable_views
for table in extract_table_references(path.read_text()):
sql = render(path.name, template_folder=path.parent)
for table in extract_table_references(sql):
ref_base = path.parent
parts = tuple(table.split("."))
for _ in parts:
@ -105,7 +107,8 @@ def _get_references(
if without_views:
yield path, list(extract_table_references_without_views(path))
else:
yield path, extract_table_references(path.read_text())
sql = render(path.name, template_folder=path.parent)
yield path, extract_table_references(sql)
except CalledProcessError as e:
raise click.ClickException(f"failed to import jnius: {e}")
except ImportError as e:

Просмотреть файл

@ -23,6 +23,7 @@ import click
from google.cloud import bigquery
from .metadata.parse_metadata import Metadata
from .util.common import render
try:
from functools import cached_property # type: ignore
@ -317,7 +318,12 @@ class DryRun:
def get_sql(self):
"""Get SQL content."""
if exists(self.sqlfile):
sql = open(self.sqlfile).read()
file_path = Path(self.sqlfile)
sql = render(
file_path.name,
format=False,
template_folder=file_path.parent.absolute(),
)
else:
raise ValueError(f"Invalid file path: {self.sqlfile}")
if self.strip_dml:

Просмотреть файл

@ -15,6 +15,9 @@ from .tokenizer import (
ExpressionSeparator,
FieldAccessOperator,
Identifier,
JinjaComment,
JinjaExpression,
JinjaStatement,
Literal,
NewlineKeyword,
OpeningBracket,
@ -35,6 +38,7 @@ def simple_format(tokens, indent=" "):
allow_space_before_next_bracket = False
allow_space_before_next_token = False
prev_was_block_end = False
prev_was_jinja = False
prev_was_statement_separator = False
prev_was_unary_operator = False
next_operator_is_unary = True
@ -75,7 +79,7 @@ def simple_format(tokens, indent=" "):
elif isinstance(
token, (AliasSeparator, ExpressionSeparator, FieldAccessOperator)
):
if prev_was_block_end:
if prev_was_block_end or prev_was_jinja:
require_newline_before_next_token = False
# yield whitespace
@ -85,7 +89,7 @@ def simple_format(tokens, indent=" "):
# no space before statement separator
# no space before first token
pass
elif isinstance(token, Comment):
elif isinstance(token, (Comment, JinjaComment)):
# blank line before comments only if they start on their own line
# and come after a statement separator
if token.value.startswith("\n") and prev_was_statement_separator:
@ -130,12 +134,15 @@ def simple_format(tokens, indent=" "):
OpeningBracket,
ExpressionSeparator,
StatementSeparator,
JinjaStatement,
JinjaComment,
),
)
allow_space_before_next_token = not isinstance(token, FieldAccessOperator)
prev_was_block_end = isinstance(token, BlockEndKeyword)
prev_was_statement_separator = isinstance(token, StatementSeparator)
prev_was_unary_operator = next_operator_is_unary and isinstance(token, Operator)
prev_was_jinja = isinstance(token, JinjaExpression)
if not isinstance(token, Comment):
# format next operator as unary if there is no preceding argument
next_operator_is_unary = not isinstance(
@ -175,13 +182,17 @@ class Line:
self.indent_level -= 1
self.inline_tokens = []
self.inline_length = 0
self.can_format = can_format and not isinstance(indent_token, Comment)
self.can_format = can_format and not isinstance(
indent_token, (Comment, JinjaComment)
)
def add(self, token):
"""Add a token to this line."""
self.inline_length += len(token.value)
self.inline_tokens.append(token)
self.can_format = self.can_format and not isinstance(token, Comment)
self.can_format = self.can_format and not isinstance(
token, (Comment, JinjaComment)
)
@property
def tokens(self):

Просмотреть файл

@ -658,7 +658,7 @@ class AliasSeparator(SpaceBeforeBracketKeyword):
"""
pattern = re.compile(
r"AS(?=\s+(?!(WITH|SELECT|STRUCT|ARRAY)\b)[a-z_`(])", re.IGNORECASE
r"AS(?=\s+(?!(WITH|SELECT|STRUCT|ARRAY)\b)[a-z_`({])", re.IGNORECASE
)
@ -736,6 +736,33 @@ class Literal(Token):
)
class JinjaExpression(Token):
"""Jinja expression delimiters {{ }}.
May be followed by no whitespace or a new line and increased indent.
"""
pattern = re.compile(r"{{.*?}}", re.DOTALL)
class JinjaStatement(Token):
"""Jinja statement delimiters {% %}.
May be followed by no whitespace or a new line and increased indent.
"""
pattern = re.compile(r"{%.*?%}", re.DOTALL)
class JinjaComment(Token):
"""Jinja comment delimiters {# #}.
May be followed by no whitespace or a new line and increased indent.
"""
pattern = re.compile(r"{#.*?#}", re.DOTALL)
class OpeningBracket(Token):
"""Opening bracket or parenthesis.
@ -814,6 +841,9 @@ BIGQUERY_TOKEN_PRIORITY = [
LineComment,
BlockComment,
Whitespace,
JinjaComment,
JinjaExpression,
JinjaStatement,
MaybeCaseSubclause,
CaseSubclause,
BlockMiddleKeyword,

Просмотреть файл

@ -9,6 +9,7 @@ from google.api_core.exceptions import BadRequest
from google.cloud import bigquery
from ..routine import parse_routine
from ..util.common import render
from .sql_test import (
TABLE_EXTENSIONS,
Table,
@ -82,7 +83,7 @@ class SqlTest(pytest.Item, pytest.File):
if test_name == "test_init":
init_test = True
query = read(f"{path}/init.sql")
query = render("init.sql", template_folder=path)
original, dest_name = (
f"{dataset_name}.{query_name}",
f"{dataset_name}_{query_name}_{test_name}",
@ -91,9 +92,9 @@ class SqlTest(pytest.Item, pytest.File):
query_name = dest_name
elif test_name == "test_script":
script_test = True
query = read(f"{path}/script.sql")
query = render("script.sql", template_folder=path)
else:
query = read(f"{path}/query.sql")
query = render("query.sql", template_folder=path)
expect = load(self.fspath.strpath, "expect")

Просмотреть файл

@ -15,6 +15,7 @@ import sqlparse
import yaml
from bigquery_etl.metadata.parse_metadata import METADATA_FILE
from bigquery_etl.util.common import render
UDF_CHAR = "[a-zA-Z0-9_]"
UDF_FILE = "udf.sql"
@ -129,14 +130,14 @@ class RawRoutine:
return ""
@classmethod
def from_file(cls, path, from_text=None):
def from_file(cls, path):
"""Create a RawRoutine instance from text."""
filepath = Path(path)
if from_text is None:
text = filepath.read_text()
else:
text = from_text
text = render(
filepath.name,
template_folder=filepath.parent,
format=False,
)
sql = sqlparse.format(text, strip_comments=True)
statements = [s for s in sqlparse.split(sql) if s.strip()]

Просмотреть файл

@ -53,9 +53,14 @@ def random_str(length: int = 12) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
def render(sql_filename, format=True, template_folder="glean_usage", **kwargs) -> str:
def render(
sql_filename,
template_folder,
format=True,
**kwargs,
) -> str:
"""Render a given template query using Jinja."""
file_loader = FileSystemLoader(f"{template_folder}/templates")
file_loader = FileSystemLoader(f"{template_folder}")
env = Environment(loader=file_loader)
main_sql = env.get_template(sql_filename)
rendered = main_sql.render(**kwargs)

Просмотреть файл

@ -21,6 +21,7 @@ from bigquery_etl.metadata.parse_metadata import (
)
from bigquery_etl.schema import Schema
from bigquery_etl.util import extract_from_query_path
from bigquery_etl.util.common import render
# skip validation for these views
SKIP_VALIDATION = {
@ -93,7 +94,8 @@ class View:
@property
def content(self):
"""Return the view SQL."""
return Path(self.path).read_text()
path = Path(self.path)
return render(path.name, template_folder=path.parent)
@classmethod
def from_file(cls, path):

Просмотреть файл

@ -1,3 +1,4 @@
{% raw %}
CREATE OR REPLACE FUNCTION hist.string_to_json(input STRING) AS (
CASE
WHEN STARTS_WITH(TRIM(input), '{')
@ -45,6 +46,7 @@ CREATE OR REPLACE FUNCTION hist.string_to_json(input STRING) AS (
END
);
{% endraw %}
-- Tests
WITH test_data AS (
SELECT

Просмотреть файл

@ -96,6 +96,7 @@ def generate(target_project, output_dir, use_cloud_function):
basename="metadata.yaml",
sql=render(
metadata_template,
template_folder="templates",
app_value=browser.value,
app_name=browser.name,
format=False,

Просмотреть файл

@ -181,21 +181,21 @@ class GleanTable:
render_kwargs.update(self.custom_render_kwargs)
render_kwargs.update(tables)
query_sql = render(query_filename, template_folder=PATH, **render_kwargs)
view_sql = render(view_filename, template_folder=PATH, **render_kwargs)
query_sql = render(query_filename, template_folder=PATH / "templates", **render_kwargs)
view_sql = render(view_filename, template_folder=PATH / "templates", **render_kwargs)
view_metadata = render(
view_metadata_filename, template_folder=PATH, format=False, **render_kwargs
view_metadata_filename, template_folder=PATH / "templates", format=False, **render_kwargs
)
table_metadata = render(
table_metadata_filename, template_folder=PATH, format=False, **render_kwargs
table_metadata_filename, template_folder=PATH / "templates", format=False, **render_kwargs
)
if not self.no_init:
try:
init_sql = render(init_filename, template_folder=PATH, **render_kwargs)
init_sql = render(init_filename, template_folder=PATH / "templates", **render_kwargs)
except TemplateNotFound:
init_sql = render(
query_filename, template_folder=PATH, init=True, **render_kwargs
query_filename, template_folder=PATH / "templates", init=True, **render_kwargs
)
if not (referenced_table_exists(view_sql)):
@ -254,7 +254,7 @@ class GleanTable:
if self.cross_channel_template:
sql = render(
self.cross_channel_template, template_folder=PATH, **render_kwargs
self.cross_channel_template, template_folder=PATH / "templates", **render_kwargs
)
view = f"{project_id}.{target_dataset}.{target_view_name}"
@ -269,13 +269,13 @@ class GleanTable:
write_sql(output_dir, view, "view.sql", sql, skip_existing=True)
else:
query_filename = f"{target_view_name}.query.sql"
query_sql = render(query_filename, template_folder=PATH, **render_kwargs)
query_sql = render(query_filename, template_folder=PATH / "templates", **render_kwargs)
view_sql = render(
f"{target_view_name}.view.sql", template_folder=PATH, **render_kwargs
f"{target_view_name}.view.sql", template_folder=PATH / "templates", **render_kwargs
)
metadata = render(
f"{self.target_table_id[:-3]}.metadata.yaml",
template_folder=PATH,
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)

Просмотреть файл

@ -2,6 +2,7 @@ import os
import pytest
from click.testing import CliRunner
from jinja2.exceptions import TemplateNotFound
from bigquery_etl.dependency import show as dependency_show
@ -15,7 +16,7 @@ class TestDependency:
def test_format_invalid_path(self, runner):
result = runner.invoke(dependency_show, ["not-existing-path.sql"])
assert result.exit_code == 1
assert isinstance(result.exception, FileNotFoundError)
assert isinstance(result.exception, TemplateNotFound)
def test_format(self, runner):
with runner.isolated_filesystem():

Просмотреть файл

@ -0,0 +1,11 @@
{% set options = ["a", "b", "c"] %}{# sample comment #}
SELECT
{% for option in options %}
{% if option == "a" %}
"option a" AS a,
{% else %}
"{{ option }}" AS {{ option }},
{% endif %}
{% endfor %}
test,{# another comment #}
foo

Просмотреть файл

@ -1,3 +1,4 @@
import os
from pathlib import Path
from bigquery_etl.routine import parse_routine
@ -52,37 +53,6 @@ class TestParseRoutine:
)
assert result.dependencies == ["udf.test_bitmask_lowest_28"]
def test_raw_routine_from_text(self):
text = (
"CREATE OR REPLACE FUNCTION udf.test_js_udf() "
+ "AS (SELECT mozfun.json.mode_last('{}'))"
)
result = parse_routine.RawRoutine.from_file(
path=TEST_DIR
/ "data"
/ "test_sql"
/ "moz-fx-data-test-project"
/ "udf"
/ "test_js_udf"
/ "udf.sql",
from_text=text,
)
assert result.name == "udf.test_js_udf"
assert len(result.definitions) == 1
assert len(result.dependencies) == 1
assert "json.mode_last" in result.dependencies
assert result.tests == []
text = "CREATE OR REPLACE FUNCTION json.mode_last() " + "AS (SELECT 1)"
result = parse_routine.RawRoutine.from_file(
path=Path("sql") / "mozfun" / "json" / "mode_last" / "udf.sql",
from_text=text,
)
assert result.name == "json.mode_last"
assert len(result.definitions) == 1
assert result.dependencies == []
assert result.tests == []
def test_parse_routine(self):
raw_routine = parse_routine.RawRoutine.from_file(
self.udf_dir / "test_shift_28_bits_one_day" / "udf.sql"
@ -238,16 +208,19 @@ class TestParseRoutine:
== "Shift input bits one day left and drop any bits beyond 28 days."
)
def test_procedure(self):
def test_procedure(self, tmp_path):
text = (
"CREATE OR REPLACE PROCEDURE procedure.test_procedure(out STRING) "
"BEGIN "
"SET out = mozfun.json.mode_last('{}'); "
"END "
)
result = parse_routine.RawRoutine.from_file(
self.udf_dir.parent / "procedure" / "test_procedure" / "sql", from_text=text
procedure_file = (
tmp_path / "procedure" / "test_procedure" / "stored_procedure.sql"
)
os.makedirs(procedure_file.parent)
procedure_file.write_text(text)
result = parse_routine.RawRoutine.from_file(procedure_file)
assert result.name == "procedure.test_procedure"
assert len(result.definitions) == 1
assert len(result.dependencies) == 1
@ -260,9 +233,8 @@ class TestParseRoutine:
"SET out = ''; "
"END "
)
result = parse_routine.RawRoutine.from_file(
self.udf_dir.parent / "procedure" / "test_procedure" / "sql", from_text=text
)
procedure_file.write_text(text)
result = parse_routine.RawRoutine.from_file(procedure_file)
assert result.name == "procedure.test_procedure"
assert len(result.definitions) == 1
assert result.dependencies == []

Просмотреть файл

@ -15,7 +15,7 @@ class TestEntrypoint:
os.makedirs(query_file_path)
query_file = query_file_path / "query.sql"
query_file.write_text("SELECT 1 AS a, 'abc' AS b;")
query_file.write_text("-- comment \n SELECT 1 AS a, 'abc' AS b;")
try:
result = subprocess.check_output(
@ -27,7 +27,52 @@ class TestEntrypoint:
],
stderr=subprocess.STDOUT,
)
assert b"Current status: DONE" in result
assert (
b"+---+-----+\n| a | b |\n+---+-----+\n| 1 | abc |\n+---+-----+\n"
in result
)
assert b"No metadata.yaml found for {}" in result
except subprocess.CalledProcessError as e:
# running bq in CircleCI will fail since it's not installed
# but the error output can be checked for whether bq was called
print(e.output)
assert b"No such file or directory: 'bq'" in e.output
assert b"No metadata.yaml found for {}" in e.output
assert (
b'subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)'
in e.output
)
@pytest.mark.integration
def test_run_templated_query(self, tmp_path, project_id):
query_file_path = tmp_path / "sql" / project_id / "query_v1"
os.makedirs(query_file_path)
query_file = query_file_path / "query.sql"
sql = """
{% set options = ["a", "b", "c"] %}
SELECT
{% for option in options %}
"{{ option }}" AS {{ option }},
{% endfor %}
"""
query_file.write_text(sql)
try:
result = subprocess.check_output(
[
ENTRYPOINT_SCRIPT,
"query",
"--project_id=" + project_id,
str(query_file),
],
stderr=subprocess.STDOUT,
)
assert (
b"+---+---+---+\n| a | b | c |\n+---+---+---+\n| a | b | c |\n+---+---+---+"
in result
)
assert b"No metadata.yaml found for {}" in result
except subprocess.CalledProcessError as e:
# running bq in CircleCI will fail since it's not installed

Просмотреть файл

@ -12,7 +12,7 @@ class TestRunQuery:
query_file_path = tmp_path / "sql" / "test" / "query_v1"
os.makedirs(query_file_path)
query_file = query_file_path / "query.sql"
query_file.write_text("SELECT 1")
query_file.write_text("-- comment \n SELECT 1")
metadata_conf = {
"friendly_name": "test",
@ -34,7 +34,11 @@ class TestRunQuery:
assert result.exit_code == 0
assert mock_call.call_args.args == (
["bq", "--dataset_id=test", "--destination_table=query_v1"],
[
"bq",
"--dataset_id=test",
"--destination_table=query_v1",
],
)
assert "stdin" in mock_call.call_args.kwargs
@ -71,7 +75,6 @@ class TestRunQuery:
"--destination_table=mozilla-public-data:test.query_v1",
],
)
assert "stdin" in mock_call.call_args.kwargs
def test_run_query_public_project_no_dataset(self, tmp_path):
query_file_path = tmp_path / "sql" / "test" / "query_v1"