diff --git a/bigquery_etl/cli/generate.py b/bigquery_etl/cli/generate.py index 5dae6e5738..7bb66699ea 100644 --- a/bigquery_etl/cli/generate.py +++ b/bigquery_etl/cli/generate.py @@ -5,7 +5,7 @@ from pathlib import Path import click -from bigquery_etl.cli.utils import is_valid_project +from bigquery_etl.cli.utils import is_valid_project, use_cloud_function_option SQL_GENERATORS_DIR = "sql_generators" GENERATE_COMMAND = "generate" @@ -71,10 +71,16 @@ generate = generate_group() default=[], multiple=True, ) +@use_cloud_function_option @click.pass_context -def generate_all(ctx, output_dir, target_project, ignore): +def generate_all(ctx, output_dir, target_project, ignore, use_cloud_function): """Run all SQL generators.""" click.echo(f"Generating SQL content in {output_dir}.") for _, cmd in reversed(generate.commands.items()): if cmd.name != "all" and cmd.name not in ignore: - ctx.invoke(cmd, output_dir=output_dir, target_project=target_project) + ctx.invoke( + cmd, + output_dir=output_dir, + target_project=target_project, + use_cloud_function=use_cloud_function, + ) diff --git a/sql_generators/country_code_lookup/__init__.py b/sql_generators/country_code_lookup/__init__.py index 6bb8593739..43246974b4 100644 --- a/sql_generators/country_code_lookup/__init__.py +++ b/sql_generators/country_code_lookup/__init__.py @@ -7,6 +7,8 @@ import click import yaml from jinja2 import Environment, FileSystemLoader +from bigquery_etl.cli.utils import use_cloud_function_option + FILE_PATH = Path(os.path.dirname(__file__)) TEMPLATES_PATH = FILE_PATH / "templates" @@ -25,7 +27,8 @@ TEMPLATES_PATH = FILE_PATH / "templates" default=Path("sql"), type=click.Path(file_okay=False), ) -def generate(target_project, output_dir): +@use_cloud_function_option +def generate(target_project, output_dir, use_cloud_function): """Generate a CSV that maps country aliases to a country code.""" target_path = Path(f"{output_dir}/{target_project}/static/country_names_v1/") target_path.mkdir(parents=True, exist_ok=True) diff --git a/sql_generators/derived_view_schemas/__init__.py b/sql_generators/derived_view_schemas/__init__.py index 8953676b52..8aed4b80db 100644 --- a/sql_generators/derived_view_schemas/__init__.py +++ b/sql_generators/derived_view_schemas/__init__.py @@ -6,6 +6,8 @@ from pathlib import Path import click from pathos.multiprocessing import ProcessingPool +from bigquery_etl.cli.utils import use_cloud_function_option + NON_USER_FACING_DATASET_SUBSTRINGS = ( "_derived", "_external", @@ -41,7 +43,7 @@ def _generate_view_schema(sql_dir, view_directory): view_references = extract_table_references(view_file.read_text()) if len(view_references) != 1: return - + target_project = view_dir.parent.parent.name target_dataset = view_dir.parent.name @@ -154,7 +156,8 @@ def _generate_view_schema(sql_dir, view_directory): default=20, type=int, ) -def generate(target_project, output_dir, parallelism): +@use_cloud_function_option +def generate(target_project, output_dir, parallelism, use_cloud_function): """ Generate schema yaml files for views in output_dir/target_project. diff --git a/sql_generators/events_daily/__init__.py b/sql_generators/events_daily/__init__.py index cc15203728..236b134296 100755 --- a/sql_generators/events_daily/__init__.py +++ b/sql_generators/events_daily/__init__.py @@ -8,7 +8,7 @@ import click import yaml from jinja2 import Environment, FileSystemLoader -from bigquery_etl.cli.utils import is_valid_project +from bigquery_etl.cli.utils import is_valid_project, use_cloud_function_option from bigquery_etl.format_sql.formatter import reformat TEMPLATED_FILES = { @@ -145,7 +145,8 @@ def get_query_dirs(path): type=click.Path(file_okay=False), default="sql", ) -def generate(target_project, path, dataset, output_dir): +@use_cloud_function_option +def generate(target_project, path, dataset, output_dir, use_cloud_function): """Generate queries at the path for project.""" write_path = Path(output_dir) / target_project for query_dir in get_query_dirs(path): diff --git a/sql_generators/experiment_monitoring/__init__.py b/sql_generators/experiment_monitoring/__init__.py index 1ee4699cbf..7732e45cb5 100644 --- a/sql_generators/experiment_monitoring/__init__.py +++ b/sql_generators/experiment_monitoring/__init__.py @@ -6,6 +6,7 @@ import click import yaml from jinja2 import Environment, FileSystemLoader +from bigquery_etl.cli.utils import use_cloud_function_option from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.util.common import write_sql @@ -85,7 +86,8 @@ def generate_queries(project, path, write_dir): default=Path("sql"), type=click.Path(file_okay=False), ) -def generate(target_project, path, output_dir): +@use_cloud_function_option +def generate(target_project, path, output_dir, use_cloud_function): """Generate the experiment monitoring views.""" output_dir = Path(output_dir) generate_queries(target_project, path, output_dir) diff --git a/sql_generators/feature_usage/__init__.py b/sql_generators/feature_usage/__init__.py index 35e80cc7fe..294c9a1e81 100644 --- a/sql_generators/feature_usage/__init__.py +++ b/sql_generators/feature_usage/__init__.py @@ -7,6 +7,7 @@ import click import yaml from jinja2 import Environment, FileSystemLoader +from bigquery_etl.cli.utils import use_cloud_function_option from bigquery_etl.format_sql.formatter import reformat from bigquery_etl.schema import SCHEMA_FILE, Schema from bigquery_etl.util.common import write_sql @@ -97,8 +98,11 @@ def generate_schema(project, dataset, destination_table, write_dir): default=Path("sql"), type=click.Path(file_okay=False), ) +@use_cloud_function_option @click.pass_context -def generate(ctx, target_project, dataset, destination_table, output_dir): +def generate( + ctx, target_project, dataset, destination_table, output_dir, use_cloud_function +): """Generate the feature usage table.""" output_dir = Path(output_dir) generate_query(target_project, dataset, destination_table, output_dir) diff --git a/sql_generators/glean_usage/__init__.py b/sql_generators/glean_usage/__init__.py index d223bf52ca..83f75a1f39 100644 --- a/sql_generators/glean_usage/__init__.py +++ b/sql_generators/glean_usage/__init__.py @@ -5,7 +5,11 @@ from pathlib import Path import click from pathos.multiprocessing import ProcessingPool -from bigquery_etl.cli.utils import is_valid_project, table_matches_patterns +from bigquery_etl.cli.utils import ( + is_valid_project, + table_matches_patterns, + use_cloud_function_option, +) from sql_generators.glean_usage import ( baseline_clients_daily, baseline_clients_first_seen, @@ -36,6 +40,7 @@ GLEAN_TABLES = [ # one to avoid confusion: https://github.com/mozilla/bigquery-etl/issues/2499 SKIP_APPS = ["mlhackweek_search", "regrets_reporter", "regrets_reporter_ucs"] + @click.command() @click.option( "--target-project", @@ -73,7 +78,10 @@ SKIP_APPS = ["mlhackweek_search", "regrets_reporter", "regrets_reporter_ucs"] "--app-name", help="App to generate per-app dataset metadata and union views for.", ) -def generate(target_project, output_dir, parallelism, exclude, only, app_name): +@use_cloud_function_option +def generate( + target_project, output_dir, parallelism, exclude, only, app_name, use_cloud_function +): """Generate per-app_id queries and views, and per-app dataset metadata and union views. Note that a file won't be generated if a corresponding file is already present @@ -120,6 +128,7 @@ def generate(target_project, output_dir, parallelism, exclude, only, app_name): table.generate_per_app_id, target_project, output_dir=output_dir, + use_cloud_function=use_cloud_function, ), baseline_table, ) @@ -131,7 +140,12 @@ def generate(target_project, output_dir, parallelism, exclude, only, app_name): # and app_info generate_per_app = [ ( - partial(table.generate_per_app, target_project, output_dir=output_dir), + partial( + table.generate_per_app, + target_project, + output_dir=output_dir, + use_cloud_function=use_cloud_function, + ), info, ) for info in app_info diff --git a/sql_generators/glean_usage/baseline_clients_first_seen.py b/sql_generators/glean_usage/baseline_clients_first_seen.py index 3f168a6684..b7f815f8b3 100644 --- a/sql_generators/glean_usage/baseline_clients_first_seen.py +++ b/sql_generators/glean_usage/baseline_clients_first_seen.py @@ -17,7 +17,9 @@ class BaselineClientsFirstSeenTable(GleanTable): self.no_init = False self.custom_render_kwargs = {} - def generate_per_app_id(self, project_id, baseline_table, output_dir=None): + def generate_per_app_id( + self, project_id, baseline_table, output_dir=None, use_cloud_function=True + ): """Generate per-app_id datasets.""" self.custom_render_kwargs = dict( # do not match on org_mozilla_firefoxreality diff --git a/sql_generators/glean_usage/common.py b/sql_generators/glean_usage/common.py index aa6b873f90..31e1771cde 100644 --- a/sql_generators/glean_usage/common.py +++ b/sql_generators/glean_usage/common.py @@ -147,7 +147,9 @@ class GleanTable: self.per_app_enabled = True self.cross_channel_template = "cross_channel.view.sql" - def generate_per_app_id(self, project_id, baseline_table, output_dir=None): + def generate_per_app_id( + self, project_id, baseline_table, output_dir=None, use_cloud_function=True + ): """Generate the baseline table query per app_id.""" if not self.per_app_id_enabled: return @@ -208,7 +210,9 @@ class GleanTable: write_dataset_metadata(output_dir, view) - def generate_per_app(self, project_id, app_info, output_dir=None): + def generate_per_app( + self, project_id, app_info, output_dir=None, use_cloud_function=True + ): """Generate the baseline table query per app_name.""" if not self.per_app_enabled: return diff --git a/sql_generators/glean_usage/events_unnested.py b/sql_generators/glean_usage/events_unnested.py index b77eae6e0f..9b9996ed86 100644 --- a/sql_generators/glean_usage/events_unnested.py +++ b/sql_generators/glean_usage/events_unnested.py @@ -23,7 +23,9 @@ class EventsUnnestedTable(GleanTable): self.per_app_id_enabled = False self.cross_channel_template = "cross_channel_events_unnested.view.sql" - def generate_per_app(self, project_id, app_info, output_dir=None): + def generate_per_app( + self, project_id, app_info, output_dir=None, use_cloud_function=True + ): """Generate the events_unnested table query per app_name.""" target_dataset = app_info[0]["app_name"] if target_dataset not in DATASET_SKIP: diff --git a/sql_generators/glean_usage/glean_app_ping_views.py b/sql_generators/glean_usage/glean_app_ping_views.py index 3757b532dd..3bd57da5ce 100644 --- a/sql_generators/glean_usage/glean_app_ping_views.py +++ b/sql_generators/glean_usage/glean_app_ping_views.py @@ -44,7 +44,9 @@ class GleanAppPingViews(GleanTable): self.per_app_id_enabled = False self.per_app_enabled = True - def generate_per_app(self, project_id, app_info, output_dir=None): + def generate_per_app( + self, project_id, app_info, output_dir=None, use_cloud_function=True + ): """ Generate per-app ping views across channels. @@ -90,11 +92,14 @@ class GleanAppPingViews(GleanTable): channel_dataset, view_name, partitioned_by="submission_timestamp", + use_cloud_function=use_cloud_function, ) cached_schemas[channel_dataset] = deepcopy(schema) try: - unioned_schema.merge(schema, add_missing_fields=True, ignore_incompatible_fields=True) + unioned_schema.merge( + schema, add_missing_fields=True, ignore_incompatible_fields=True + ) except Exception as e: # if schema incompatibilities are detected, then only generate for release channel print( @@ -109,14 +114,20 @@ class GleanAppPingViews(GleanTable): for app in app_info: channel_dataset = app["document_namespace"].replace("-", "_") - if channel_dataset not in cached_schemas or cached_schemas[channel_dataset].schema["fields"] == []: + if ( + channel_dataset not in cached_schemas + or cached_schemas[channel_dataset].schema["fields"] == [] + ): # check for empty schemas (e.g. restricted ones) and skip for now - print(f"Cannot get schema for `{channel_dataset}.{view_name}`; Skipping") + print( + f"Cannot get schema for `{channel_dataset}.{view_name}`; Skipping" + ) continue # compare table schema with unioned schema to determine fields that need to be NULL select_expression = self._generate_select_expression( - unioned_schema.schema["fields"], cached_schemas[channel_dataset].schema["fields"] + unioned_schema.schema["fields"], + cached_schemas[channel_dataset].schema["fields"], ) queries.append( @@ -147,7 +158,9 @@ class GleanAppPingViews(GleanTable): skip_existing=True, ) - app_channels = [f"{channel['dataset']}.{view_name}" for channel in queries] + app_channels = [ + f"{channel['dataset']}.{view_name}" for channel in queries + ] write_sql( output_dir, @@ -193,40 +206,52 @@ class GleanAppPingViews(GleanTable): if node.get("mode", None) == "REPEATED": # unnest repeated record - select_expr.append(f""" - ( - SELECT ARRAY_AGG( - STRUCT( - {self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], [node_name])} + select_expr.append( + f""" + ( + SELECT ARRAY_AGG( + STRUCT( + {self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], [node_name])} + ) ) - ) - FROM UNNEST({'.'.join(path + [node_name])}) AS {node_name} - ) AS {node_name} - """) + FROM UNNEST({'.'.join(path + [node_name])}) AS {node_name} + ) AS {node_name} + """ + ) else: # select struct fields - select_expr.append(f""" - STRUCT( - {self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], path + [node_name])} - ) AS {node_name} - """) + select_expr.append( + f""" + STRUCT( + {self._generate_select_expression(node['fields'], app_schema_nodes[node_name]['fields'], path + [node_name])} + ) AS {node_name} + """ + ) else: if node.get("mode", None) == "REPEATED": - select_expr.append(f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}") + select_expr.append( + f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}" + ) else: - select_expr.append(f"SAFE_CAST(NULL AS {dtype}) AS {node_name}") + select_expr.append( + f"SAFE_CAST(NULL AS {dtype}) AS {node_name}" + ) else: if dtype == "RECORD": # unwrap missing struct - workaround to prevent type incompatibilities; NULL is always INT in STRUCT - select_expr.append(f""" - STRUCT( - {self._generate_select_expression(node['fields'], {}, path + [node_name])} - ) AS {node_name} - """) + select_expr.append( + f""" + STRUCT( + {self._generate_select_expression(node['fields'], {}, path + [node_name])} + ) AS {node_name} + """ + ) else: # field doesn't exist in app schema, set to NULL if node.get("mode", None) == "REPEATED": - select_expr.append(f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}") + select_expr.append( + f"SAFE_CAST(NULL AS ARRAY<{dtype}>) AS {node_name}" + ) else: select_expr.append(f"SAFE_CAST(NULL AS {dtype}) AS {node_name}") diff --git a/sql_generators/search/__init__.py b/sql_generators/search/__init__.py index fc32053b15..b76b6c3ee1 100755 --- a/sql_generators/search/__init__.py +++ b/sql_generators/search/__init__.py @@ -15,6 +15,7 @@ from typing import List import click from jinja2 import Environment, FileSystemLoader +from bigquery_etl.cli.utils import use_cloud_function_option from bigquery_etl.format_sql.formatter import reformat # fmt: off @@ -71,7 +72,8 @@ def union_statements(statements: List[str]): help="GCP project ID", default="moz-fx-data-shared-prod", ) -def generate(output_dir, target_project): +@use_cloud_function_option +def generate(output_dir, target_project, use_cloud_function): """Generate mobile search clients daily query and print to stdout.""" base_dir = Path(__file__).parent @@ -166,16 +168,10 @@ def generate(output_dir, target_project): ) ios_focus_combined_metrics = union_statements( - [ - f"SELECT * FROM metrics_{namespace}" - for namespace, _, _ in FOCUS_iOS_TUPLES - ] + [f"SELECT * FROM metrics_{namespace}" for namespace, _, _ in FOCUS_iOS_TUPLES] ) ios_klar_combined_metrics = union_statements( - [ - f"SELECT * FROM metrics_{namespace}" - for namespace, _, _ in KLAR_iOS_TUPLES - ] + [f"SELECT * FROM metrics_{namespace}" for namespace, _, _ in KLAR_iOS_TUPLES] ) search_query = search_query_template.render( diff --git a/sql_generators/stable_views/__init__.py b/sql_generators/stable_views/__init__.py index e5f42d8318..4c2c9988a7 100644 --- a/sql_generators/stable_views/__init__.py +++ b/sql_generators/stable_views/__init__.py @@ -17,6 +17,7 @@ from pathlib import Path import click from pathos.multiprocessing import ProcessingPool +from bigquery_etl.cli.utils import use_cloud_function_option from bigquery_etl.schema.stable_table_schema import SchemaFile, get_stable_table_schemas VIEW_QUERY_TEMPLATE = """\ @@ -182,7 +183,9 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF "'Firefox' AS normalized_app_name", ] elif schema.schema_id.startswith("moz://mozilla.org/schemas/main/ping/"): - replacements += ["`moz-fx-data-shared-prod`.udf.normalize_main_payload(payload) AS payload"] + replacements += [ + "`moz-fx-data-shared-prod`.udf.normalize_main_payload(payload) AS payload" + ] replacements_str = ",\n ".join(replacements) full_sql = reformat( VIEW_QUERY_TEMPLATE.format( @@ -214,7 +217,9 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF stable_table_schema = Schema.from_json({"fields": schema.schema}) view_schema.merge( - stable_table_schema, attributes=["description"], add_missing_fields=False + stable_table_schema, + attributes=["description"], + add_missing_fields=False, ) view_schema.to_yaml_file(target_dir / "schema.yaml") except Exception as e: @@ -251,7 +256,8 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF default=20, type=int, ) -def generate(target_project, output_dir, log_level, parallelism): +@use_cloud_function_option +def generate(target_project, output_dir, log_level, parallelism, use_cloud_function): """ Generate view definitions.