Add --use_cloud_function option when generating SQL queries (#3565)

This commit is contained in:
Anna Scholtz 2023-02-03 14:10:04 -08:00 коммит произвёл GitHub
Родитель 1a595931aa
Коммит 20f764835f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 125 добавлений и 57 удалений

Просмотреть файл

@ -5,7 +5,7 @@ from pathlib import Path
import click
from bigquery_etl.cli.utils import is_valid_project
from bigquery_etl.cli.utils import is_valid_project, use_cloud_function_option
SQL_GENERATORS_DIR = "sql_generators"
GENERATE_COMMAND = "generate"
@ -71,10 +71,16 @@ generate = generate_group()
default=[],
multiple=True,
)
@use_cloud_function_option
@click.pass_context
def generate_all(ctx, output_dir, target_project, ignore):
def generate_all(ctx, output_dir, target_project, ignore, use_cloud_function):
"""Run all SQL generators."""
click.echo(f"Generating SQL content in {output_dir}.")
for _, cmd in reversed(generate.commands.items()):
if cmd.name != "all" and cmd.name not in ignore:
ctx.invoke(cmd, output_dir=output_dir, target_project=target_project)
ctx.invoke(
cmd,
output_dir=output_dir,
target_project=target_project,
use_cloud_function=use_cloud_function,
)

Просмотреть файл

@ -7,6 +7,8 @@ import click
import yaml
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
FILE_PATH = Path(os.path.dirname(__file__))
TEMPLATES_PATH = FILE_PATH / "templates"
@ -25,7 +27,8 @@ TEMPLATES_PATH = FILE_PATH / "templates"
default=Path("sql"),
type=click.Path(file_okay=False),
)
def generate(target_project, output_dir):
@use_cloud_function_option
def generate(target_project, output_dir, use_cloud_function):
"""Generate a CSV that maps country aliases to a country code."""
target_path = Path(f"{output_dir}/{target_project}/static/country_names_v1/")
target_path.mkdir(parents=True, exist_ok=True)

Просмотреть файл

@ -6,6 +6,8 @@ from pathlib import Path
import click
from pathos.multiprocessing import ProcessingPool
from bigquery_etl.cli.utils import use_cloud_function_option
NON_USER_FACING_DATASET_SUBSTRINGS = (
"_derived",
"_external",
@ -41,7 +43,7 @@ def _generate_view_schema(sql_dir, view_directory):
view_references = extract_table_references(view_file.read_text())
if len(view_references) != 1:
return
target_project = view_dir.parent.parent.name
target_dataset = view_dir.parent.name
@ -154,7 +156,8 @@ def _generate_view_schema(sql_dir, view_directory):
default=20,
type=int,
)
def generate(target_project, output_dir, parallelism):
@use_cloud_function_option
def generate(target_project, output_dir, parallelism, use_cloud_function):
"""
Generate schema yaml files for views in output_dir/target_project.

Просмотреть файл

@ -8,7 +8,7 @@ import click
import yaml
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import is_valid_project
from bigquery_etl.cli.utils import is_valid_project, use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
TEMPLATED_FILES = {
@ -145,7 +145,8 @@ def get_query_dirs(path):
type=click.Path(file_okay=False),
default="sql",
)
def generate(target_project, path, dataset, output_dir):
@use_cloud_function_option
def generate(target_project, path, dataset, output_dir, use_cloud_function):
"""Generate queries at the path for project."""
write_path = Path(output_dir) / target_project
for query_dir in get_query_dirs(path):

Просмотреть файл

@ -6,6 +6,7 @@ import click
import yaml
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.util.common import write_sql
@ -85,7 +86,8 @@ def generate_queries(project, path, write_dir):
default=Path("sql"),
type=click.Path(file_okay=False),
)
def generate(target_project, path, output_dir):
@use_cloud_function_option
def generate(target_project, path, output_dir, use_cloud_function):
"""Generate the experiment monitoring views."""
output_dir = Path(output_dir)
generate_queries(target_project, path, output_dir)

Просмотреть файл

@ -7,6 +7,7 @@ import click
import yaml
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
from bigquery_etl.schema import SCHEMA_FILE, Schema
from bigquery_etl.util.common import write_sql
@ -97,8 +98,11 @@ def generate_schema(project, dataset, destination_table, write_dir):
default=Path("sql"),
type=click.Path(file_okay=False),
)
@use_cloud_function_option
@click.pass_context
def generate(ctx, target_project, dataset, destination_table, output_dir):
def generate(
ctx, target_project, dataset, destination_table, output_dir, use_cloud_function
):
"""Generate the feature usage table."""
output_dir = Path(output_dir)
generate_query(target_project, dataset, destination_table, output_dir)

Просмотреть файл

@ -5,7 +5,11 @@ from pathlib import Path
import click
from pathos.multiprocessing import ProcessingPool
from bigquery_etl.cli.utils import is_valid_project, table_matches_patterns
from bigquery_etl.cli.utils import (
is_valid_project,
table_matches_patterns,
use_cloud_function_option,
)
from sql_generators.glean_usage import (
baseline_clients_daily,
baseline_clients_first_seen,
@ -36,6 +40,7 @@ GLEAN_TABLES = [
# one to avoid confusion: https://github.com/mozilla/bigquery-etl/issues/2499
SKIP_APPS = ["mlhackweek_search", "regrets_reporter", "regrets_reporter_ucs"]
@click.command()
@click.option(
"--target-project",
@ -73,7 +78,10 @@ SKIP_APPS = ["mlhackweek_search", "regrets_reporter", "regrets_reporter_ucs"]
"--app-name",
help="App to generate per-app dataset metadata and union views for.",
)
def generate(target_project, output_dir, parallelism, exclude, only, app_name):
@use_cloud_function_option
def generate(
target_project, output_dir, parallelism, exclude, only, app_name, use_cloud_function
):
"""Generate per-app_id queries and views, and per-app dataset metadata and union views.
Note that a file won't be generated if a corresponding file is already present
@ -120,6 +128,7 @@ def generate(target_project, output_dir, parallelism, exclude, only, app_name):
table.generate_per_app_id,
target_project,
output_dir=output_dir,
use_cloud_function=use_cloud_function,
),
baseline_table,
)
@ -131,7 +140,12 @@ def generate(target_project, output_dir, parallelism, exclude, only, app_name):
# and app_info
generate_per_app = [
(
partial(table.generate_per_app, target_project, output_dir=output_dir),
partial(
table.generate_per_app,
target_project,
output_dir=output_dir,
use_cloud_function=use_cloud_function,
),
info,
)
for info in app_info

Просмотреть файл

@ -17,7 +17,9 @@ class BaselineClientsFirstSeenTable(GleanTable):
self.no_init = False
self.custom_render_kwargs = {}
def generate_per_app_id(self, project_id, baseline_table, output_dir=None):
def generate_per_app_id(
self, project_id, baseline_table, output_dir=None, use_cloud_function=True
):
"""Generate per-app_id datasets."""
self.custom_render_kwargs = dict(
# do not match on org_mozilla_firefoxreality

Просмотреть файл

@ -147,7 +147,9 @@ class GleanTable:
self.per_app_enabled = True
self.cross_channel_template = "cross_channel.view.sql"
def generate_per_app_id(self, project_id, baseline_table, output_dir=None):
def generate_per_app_id(
self, project_id, baseline_table, output_dir=None, use_cloud_function=True
):
"""Generate the baseline table query per app_id."""
if not self.per_app_id_enabled:
return
@ -208,7 +210,9 @@ class GleanTable:
write_dataset_metadata(output_dir, view)
def generate_per_app(self, project_id, app_info, output_dir=None):
def generate_per_app(
self, project_id, app_info, output_dir=None, use_cloud_function=True
):
"""Generate the baseline table query per app_name."""
if not self.per_app_enabled:
return

Просмотреть файл

@ -23,7 +23,9 @@ class EventsUnnestedTable(GleanTable):
self.per_app_id_enabled = False
self.cross_channel_template = "cross_channel_events_unnested.view.sql"
def generate_per_app(self, project_id, app_info, output_dir=None):
def generate_per_app(
self, project_id, app_info, output_dir=None, use_cloud_function=True
):
"""Generate the events_unnested table query per app_name."""
target_dataset = app_info[0]["app_name"]
if target_dataset not in DATASET_SKIP:

Просмотреть файл

@ -44,7 +44,9 @@ class GleanAppPingViews(GleanTable):
self.per_app_id_enabled = False
self.per_app_enabled = True
def generate_per_app(self, project_id, app_info, output_dir=None):
def generate_per_app(
self, project_id, app_info, output_dir=None, use_cloud_function=True
):
"""
Generate per-app ping views across channels.
@ -90,11 +92,14 @@ class GleanAppPingViews(GleanTable):
channel_dataset,
view_name,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
)
cached_schemas[channel_dataset] = deepcopy(schema)
try:
unioned_schema.merge(schema, add_missing_fields=True, ignore_incompatible_fields=True)
unioned_schema.merge(
schema, add_missing_fields=True, ignore_incompatible_fields=True
)
except Exception as e:
# if schema incompatibilities are detected, then only generate for release channel
print(
@ -109,14 +114,20 @@ class GleanAppPingViews(GleanTable):
for app in app_info:
channel_dataset = app["document_namespace"].replace("-", "_")
if channel_dataset not in cached_schemas or cached_schemas[channel_dataset].schema["fields"] == []:
if (
channel_dataset not in cached_schemas
or cached_schemas[channel_dataset].schema["fields"] == []
):
# check for empty schemas (e.g. restricted ones) and skip for now
print(f"Cannot get schema for `{channel_dataset}.{view_name}`; Skipping")
print(
f"Cannot get schema for `{channel_dataset}.{view_name}`; Skipping"
)
continue
# compare table schema with unioned schema to determine fields that need to be NULL
select_expression = self._generate_select_expression(
unioned_schema.schema["fields"], cached_schemas[channel_dataset].schema["fields"]
unioned_schema.schema["fields"],
cached_schemas[channel_dataset].schema["fields"],
)
queries.append(
@ -147,7 +158,9 @@ class GleanAppPingViews(GleanTable):
skip_existing=True,
)
app_channels = [f"{channel['dataset']}.{view_name}" for channel in queries]
app_channels = [
f"{channel['dataset']}.{view_name}" for channel in queries
]
write_sql(
output_dir,
@ -193,40 +206,52 @@ class GleanAppPingViews(GleanTable):
if node.get("mode", None) == "REPEATED":
# unnest repeated record
select_expr.append(f"""
(
SELECT ARRAY_AGG(
STRUCT(
{self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], [node_name])}
select_expr.append(
f"""
(
SELECT ARRAY_AGG(
STRUCT(
{self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], [node_name])}
)
)
)
FROM UNNEST({'.'.join(path + [node_name])}) AS {node_name}
) AS {node_name}
""")
FROM UNNEST({'.'.join(path + [node_name])}) AS {node_name}
) AS {node_name}
"""
)
else:
# select struct fields
select_expr.append(f"""
STRUCT(
{self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], path + [node_name])}
) AS {node_name}
""")
select_expr.append(
f"""
STRUCT(
{self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], path + [node_name])}
) AS {node_name}
"""
)
else:
if node.get("mode", None) == "REPEATED":
select_expr.append(f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}")
select_expr.append(
f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}"
)
else:
select_expr.append(f"SAFE_CAST(NULL AS {dtype}) AS {node_name}")
select_expr.append(
f"SAFE_CAST(NULL AS {dtype}) AS {node_name}"
)
else:
if dtype == "RECORD":
# unwrap missing struct - workaround to prevent type incompatibilities; NULL is always INT in STRUCT
select_expr.append(f"""
STRUCT(
{self._generate_select_expression(node['fields'], {}, path + [node_name])}
) AS {node_name}
""")
select_expr.append(
f"""
STRUCT(
{self._generate_select_expression(node['fields'], {}, path + [node_name])}
) AS {node_name}
"""
)
else:
# field doesn't exist in app schema, set to NULL
if node.get("mode", None) == "REPEATED":
select_expr.append(f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}")
select_expr.append(
f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}"
)
else:
select_expr.append(f"SAFE_CAST(NULL AS {dtype}) AS {node_name}")

Просмотреть файл

@ -15,6 +15,7 @@ from typing import List
import click
from jinja2 import Environment, FileSystemLoader
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.format_sql.formatter import reformat
# fmt: off
@ -71,7 +72,8 @@ def union_statements(statements: List[str]):
help="GCP project ID",
default="moz-fx-data-shared-prod",
)
def generate(output_dir, target_project):
@use_cloud_function_option
def generate(output_dir, target_project, use_cloud_function):
"""Generate mobile search clients daily query and print to stdout."""
base_dir = Path(__file__).parent
@ -166,16 +168,10 @@ def generate(output_dir, target_project):
)
ios_focus_combined_metrics = union_statements(
[
f"SELECT * FROM metrics_{namespace}"
for namespace, _, _ in FOCUS_iOS_TUPLES
]
[f"SELECT * FROM metrics_{namespace}" for namespace, _, _ in FOCUS_iOS_TUPLES]
)
ios_klar_combined_metrics = union_statements(
[
f"SELECT * FROM metrics_{namespace}"
for namespace, _, _ in KLAR_iOS_TUPLES
]
[f"SELECT * FROM metrics_{namespace}" for namespace, _, _ in KLAR_iOS_TUPLES]
)
search_query = search_query_template.render(

Просмотреть файл

@ -17,6 +17,7 @@ from pathlib import Path
import click
from pathos.multiprocessing import ProcessingPool
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.schema.stable_table_schema import SchemaFile, get_stable_table_schemas
VIEW_QUERY_TEMPLATE = """\
@ -182,7 +183,9 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
"'Firefox' AS normalized_app_name",
]
elif schema.schema_id.startswith("moz://mozilla.org/schemas/main/ping/"):
replacements += ["`moz-fx-data-shared-prod`.udf.normalize_main_payload(payload) AS payload"]
replacements += [
"`moz-fx-data-shared-prod`.udf.normalize_main_payload(payload) AS payload"
]
replacements_str = ",\n ".join(replacements)
full_sql = reformat(
VIEW_QUERY_TEMPLATE.format(
@ -214,7 +217,9 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
stable_table_schema = Schema.from_json({"fields": schema.schema})
view_schema.merge(
stable_table_schema, attributes=["description"], add_missing_fields=False
stable_table_schema,
attributes=["description"],
add_missing_fields=False,
)
view_schema.to_yaml_file(target_dir / "schema.yaml")
except Exception as e:
@ -251,7 +256,8 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
default=20,
type=int,
)
def generate(target_project, output_dir, log_level, parallelism):
@use_cloud_function_option
def generate(target_project, output_dir, log_level, parallelism, use_cloud_function):
"""
Generate view definitions.