From 49bcef40f8ef411932b11aa5283e14a5acaab258 Mon Sep 17 00:00:00 2001 From: Lucia Vargas Date: Tue, 26 Sep 2023 17:16:10 +0200 Subject: [PATCH] DS-2948 Document the initialization of clients_first_seen_v2, which is not running from main branch. --- README.md | 2 +- .../README.md | 22 + .../bigquery_etl_cli_query.py | 2261 +++++++++++++++++ ...ry_derived_clients_first_seen_v2_query.sql | 360 +++ 4 files changed, 2644 insertions(+), 1 deletion(-) create mode 100644 backfill/2023-09-26-initialize-clients_first_seen_v2/README.md create mode 100644 backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py create mode 100644 backfill/2023-09-26-initialize-clients_first_seen_v2/telemetry_derived_clients_first_seen_v2_query.sql diff --git a/README.md b/README.md index 0d2c8e3..1f3eece 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ There is a `script` directory containing relatively pristine reference scripts that you can copy and paste into a new backfill scenario and modify for your particular needs. -There is a `backfills` directory where each subdirectory should be a dated +There is a `backfill` directory where each subdirectory should be a dated backfill event, containing all the scripts used and a description of the overall scenario. diff --git a/backfill/2023-09-26-initialize-clients_first_seen_v2/README.md b/backfill/2023-09-26-initialize-clients_first_seen_v2/README.md new file mode 100644 index 0000000..36befb9 --- /dev/null +++ b/backfill/2023-09-26-initialize-clients_first_seen_v2/README.md @@ -0,0 +1,22 @@ +# Initialize table telemetry_derived.clients_first_seen_v2 + + - [DS-2948](https://mozilla-hub.atlassian.net/browse/DS-2948/): Redefining Desktop first_seen_date and New Profiles + - GCP project used: `moz-fx-data-shared-prod.telemetry_derived` + +## Context + + - The table telemetry_derived.clients_first_seen_v2 redefines the first_seen_Date for Desktop clients by querying 3 pings: main, new_profile ans first_shutdown. + - Give the size of the data, it's required to initialize in parallel and this runs using bigquery-etl and a query that are suceptible to change due to ongoing updates to the _run_query function. + - Therefore, the decision is to initialize the table with the current state of bigquery-etl and the script. + + +## Step 1: Run from repository [bigquery-etl](https://github.com/mozilla/bigquery-etl) + +- git checkout branch DS-3054_support_running_initialization_query_in_parallel + +- If the branch is not available, update the repository for files: +- bigquery_etl/cli/query.py +- sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v2/query.sql + +## Step 2: Run command to initialize +`./bqetl query initialize sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v2/query.sql` diff --git a/backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py b/backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py new file mode 100644 index 0000000..5e213f6 --- /dev/null +++ b/backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py @@ -0,0 +1,2261 @@ +"""bigquery-etl CLI query command.""" + +import copy +import datetime +import logging +import multiprocessing +import os +import re +import string +import subprocess +import sys +import tempfile +import typing +from datetime import date, timedelta +from functools import partial +from multiprocessing.pool import Pool, ThreadPool +from pathlib import Path +from tempfile import NamedTemporaryFile +from timeit import default_timer +from traceback import print_exc + +import click +import yaml +from dateutil.rrule import MONTHLY, rrule +from google.cloud import bigquery +from google.cloud.exceptions import NotFound, PreconditionFailed + +from ..backfill.utils import QUALIFIED_TABLE_NAME_RE, qualified_table_name_matching +from ..cli.format import format +from ..cli.utils import ( + is_authenticated, + is_valid_project, + no_dryrun_option, + parallelism_option, + paths_matching_name_pattern, + project_id_option, + respect_dryrun_skip_option, + sql_dir_option, + temp_dataset_option, + use_cloud_function_option, +) +from ..config import ConfigLoader +from ..dependency import get_dependency_graph +from ..dryrun import DryRun +from ..format_sql.format import skip_format +from ..format_sql.formatter import reformat +from ..metadata import validate_metadata +from ..metadata.parse_metadata import ( + METADATA_FILE, + BigQueryMetadata, + ClusteringMetadata, + DatasetMetadata, + ExternalDataFormat, + Metadata, + PartitionMetadata, + PartitionType, +) +from ..query_scheduling.dag_collection import DagCollection +from ..query_scheduling.generate_airflow_dags import get_dags +from ..schema import SCHEMA_FILE, Schema +from ..util import extract_from_query_path +from ..util.bigquery_id import sql_table_id +from ..util.common import random_str +from ..util.common import render as render_template +from ..util.parallel_topological_sorter import ParallelTopologicalSorter +from .dryrun import dryrun +from .generate import generate_all + +QUERY_NAME_RE = re.compile(r"(?P[a-zA-z0-9_]+)\.(?P[a-zA-z0-9_]+)") +VERSION_RE = re.compile(r"_v[0-9]+") +DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$") +DEFAULT_DAG_NAME = "bqetl_default" +DEFAULT_PARALLELISM = 10 + + +@click.group(help="Commands for managing queries.") +@click.pass_context +def query(ctx): + """Create the CLI group for the query command.""" + # create temporary directory generated content is written to + # the directory will be deleted automatically after the command exits + ctx.ensure_object(dict) + ctx.obj["TMP_DIR"] = ctx.with_resource(tempfile.TemporaryDirectory()) + + +@query.command( + help="""Create a new query with name + ., for example: telemetry_derived.active_profiles. + Use the `--project_id` option to change the project the query is added to; + default is `moz-fx-data-shared-prod`. Views are automatically generated + in the publicly facing dataset. + + Examples: + + \b + ./bqetl query create telemetry_derived.deviations_v1 \\ + --owner=example@mozilla.com + + \b + # The query version gets autocompleted to v1. Queries are created in the + # _derived dataset and accompanying views in the public dataset. + ./bqetl query create telemetry.deviations --owner=example@mozilla.com + """, +) +@click.argument("name") +@sql_dir_option +@project_id_option( + ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod") +) +@click.option( + "--owner", + "-o", + help="Owner of the query (email address)", + default="example@mozilla.com", +) +@click.option( + "--init", + "-i", + help="Create an init.sql file to initialize the table", + default=False, + is_flag=True, +) +@click.option( + "--dag", + "-d", + help=( + f"Name of the DAG the query should be scheduled under." + "If there is no DAG name specified, the query is" + f"scheduled by default in DAG {DEFAULT_DAG_NAME}." + "To skip the automated scheduling use --no_schedule." + "To see available DAGs run `bqetl dag info`." + "To create a new DAG run `bqetl dag create`." + ), + default=DEFAULT_DAG_NAME, +) +@click.option( + "--no_schedule", + "--no-schedule", + help=( + "Using this option creates the query without scheduling information." + " Use `bqetl query schedule` to add it manually if required." + ), + default=False, + is_flag=True, +) +@click.pass_context +def create(ctx, name, sql_dir, project_id, owner, init, dag, no_schedule): + """CLI command for creating a new query.""" + # create directory structure for query + try: + match = QUERY_NAME_RE.match(name) + name = match.group("name") + dataset = match.group("dataset") + + version = "_" + name.split("_")[-1] + if not VERSION_RE.match(version): + version = "_v1" + else: + name = "_".join(name.split("_")[:-1]) + except AttributeError: + click.echo( + "New queries must be named like:" + + " . or .
_v[n]" + ) + sys.exit(1) + + derived_path = None + view_path = None + path = Path(sql_dir) + + if dataset.endswith("_derived"): + # create a directory for the corresponding view + derived_path = path / project_id / dataset / (name + version) + derived_path.mkdir(parents=True) + + view_path = path / project_id / dataset.replace("_derived", "") / name + view_path.mkdir(parents=True) + else: + # check if there is a corresponding derived dataset + if (path / project_id / (dataset + "_derived")).exists(): + derived_path = path / project_id / (dataset + "_derived") / (name + version) + derived_path.mkdir(parents=True) + view_path = path / project_id / dataset / name + view_path.mkdir(parents=True) + + dataset = dataset + "_derived" + else: + # some dataset that is not specified as _derived + # don't automatically create views + derived_path = path / project_id / dataset / (name + version) + derived_path.mkdir(parents=True) + + click.echo(f"Created query in {derived_path}") + + if view_path: + click.echo(f"Created corresponding view in {view_path}") + view_file = view_path / "view.sql" + view_dataset = dataset.replace("_derived", "") + view_file.write_text( + reformat( + f"""CREATE OR REPLACE VIEW + `{project_id}.{view_dataset}.{name}` + AS SELECT * FROM + `{project_id}.{dataset}.{name}{version}`""" + ) + + "\n" + ) + + # create query.sql file + query_file = derived_path / "query.sql" + query_file.write_text( + reformat( + f"""-- Query for {dataset}.{name}{version} + -- For more information on writing queries see: + -- https://docs.telemetry.mozilla.org/cookbooks/bigquery/querying.html + SELECT * FROM table WHERE submission_date = @submission_date""" + ) + + "\n" + ) + + # create default metadata.yaml + metadata_file = derived_path / "metadata.yaml" + metadata = Metadata( + friendly_name=string.capwords(name.replace("_", " ")), + description="Please provide a description for the query", + owners=[owner], + labels={"incremental": True}, + bigquery=BigQueryMetadata( + time_partitioning=PartitionMetadata(field="", type=PartitionType.DAY), + clustering=ClusteringMetadata(fields=[]), + ), + ) + metadata.write(metadata_file) + + # optionally create init.sql + if init: + init_file = derived_path / "init.sql" + init_file.write_text( + reformat( + f""" + -- SQL for initializing the query destination table. + CREATE OR REPLACE TABLE + `{ConfigLoader.get('default', 'project', fallback="moz-fx-data-shared-prod")}.{dataset}.{name}{version}` + AS SELECT * FROM table""" + ) + + "\n" + ) + + dataset_metadata_file = derived_path.parent / "dataset_metadata.yaml" + if not dataset_metadata_file.exists(): + dataset_name = str(dataset_metadata_file.parent.name) + dataset_metadata = DatasetMetadata( + friendly_name=string.capwords(dataset_name.replace("_", " ")), + description="Please provide a description for the dataset", + dataset_base_acl="derived", + user_facing=False, + ) + dataset_metadata.write(dataset_metadata_file) + click.echo(f"Created dataset metadata in {dataset_metadata_file}") + + if view_path: + dataset_metadata_file = view_path.parent / "dataset_metadata.yaml" + if not dataset_metadata_file.exists(): + dataset_name = str(dataset_metadata_file.parent.name) + dataset_metadata = DatasetMetadata( + friendly_name=string.capwords(dataset_name.replace("_", " ")), + description="Please provide a description for the dataset", + dataset_base_acl="view", + user_facing=True, + ) + dataset_metadata.write(dataset_metadata_file) + click.echo(f"Created dataset metadata in {dataset_metadata_file}") + + if no_schedule: + click.echo( + click.style( + "WARNING: This query has been created without " + "scheduling information. Use `bqetl query schedule`" + " to manually add it to a DAG or " + "`bqetl query create --help` for more options.", + fg="yellow", + ) + ) + else: + ctx.invoke(schedule, name=derived_path, dag=dag) + + +@query.command( + help="""Schedule an existing query + + Examples: + + \b + ./bqetl query schedule telemetry_derived.deviations_v1 \\ + --dag=bqetl_deviations + + \b + # Set a specific name for the task + ./bqetl query schedule telemetry_derived.deviations_v1 \\ + --dag=bqetl_deviations \\ + --task-name=deviations + """, +) +@click.argument("name") +@sql_dir_option +@project_id_option() +@click.option( + "--dag", + "-d", + help=( + "Name of the DAG the query should be scheduled under. " + "To see available DAGs run `bqetl dag info`. " + "To create a new DAG run `bqetl dag create`." + ), +) +@click.option( + "--depends_on_past", + "--depends-on-past", + help="Only execute query if previous scheduled run succeeded.", + default=False, + type=bool, +) +@click.option( + "--task_name", + "--task-name", + help=( + "Custom name for the Airflow task. By default the task name is a " + "combination of the dataset and table name." + ), +) +def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name): + """CLI command for scheduling a query.""" + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + + if query_files == []: + click.echo(f"Name doesn't refer to any queries: {name}", err=True) + sys.exit(1) + + sql_dir = Path(sql_dir) + + dags = DagCollection.from_file(sql_dir.parent / "dags.yaml") + + dags_to_be_generated = set() + + for query_file in query_files: + try: + metadata = Metadata.of_query_file(query_file) + except FileNotFoundError: + click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.") + continue + + if dag: + # check if DAG already exists + existing_dag = dags.dag_by_name(dag) + if not existing_dag: + click.echo( + ( + f"DAG {dag} does not exist. " + "To see available DAGs run `bqetl dag info`. " + "To create a new DAG run `bqetl dag create`." + ), + err=True, + ) + sys.exit(1) + + # write scheduling information to metadata file + metadata.scheduling = {} + metadata.scheduling["dag_name"] = dag + + if depends_on_past: + metadata.scheduling["depends_on_past"] = depends_on_past + + if task_name: + metadata.scheduling["task_name"] = task_name + + metadata.write(query_file.parent / METADATA_FILE) + logging.info( + f"Updated {query_file.parent / METADATA_FILE} with scheduling" + " information. For more information about scheduling queries see: " + "https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow" + ) + + # update dags since new task has been added + dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) + dags_to_be_generated.add(dag) + else: + dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir) + if metadata.scheduling == {}: + click.echo(f"No scheduling information for: {query_file}", err=True) + sys.exit(1) + else: + dags_to_be_generated.add(metadata.scheduling["dag_name"]) + + # re-run DAG generation for the affected DAG + for d in dags_to_be_generated: + existing_dag = dags.dag_by_name(d) + logging.info(f"Running DAG generation for {existing_dag.name}") + output_dir = sql_dir.parent / "dags" + dags.dag_to_airflow(output_dir, existing_dag) + + +@query.command( + help="""Get information about all or specific queries. + + Examples: + + \b + # Get info for specific queries + ./bqetl query info telemetry_derived.* + + \b + # Get cost and last update timestamp information + ./bqetl query info telemetry_derived.clients_daily_v6 \\ + --cost --last_updated + """, +) +@click.argument("name", required=False) +@sql_dir_option +@project_id_option() +@click.option("--cost", help="Include information about query costs", is_flag=True) +@click.option( + "--last_updated", + help="Include timestamps when destination tables were last updated", + is_flag=True, +) +@click.pass_context +def info(ctx, name, sql_dir, project_id, cost, last_updated): + """Return information about all or specific queries.""" + if name is None: + name = "*.*" + + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + if query_files == []: + # run SQL generators if no matching query has been found + ctx.invoke( + generate_all, + output_dir=ctx.obj["TMP_DIR"], + ignore=["derived_view_schemas", "stable_views"], + ) + query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) + + for query_file in query_files: + query_file_path = Path(query_file) + table = query_file_path.parent.name + dataset = query_file_path.parent.parent.name + project = query_file_path.parent.parent.parent.name + + try: + metadata = Metadata.of_query_file(query_file) + except FileNotFoundError: + metadata = None + + click.secho(f"{project}.{dataset}.{table}", bold=True) + click.echo(f"path: {query_file}") + + if metadata is None: + click.echo("No metadata") + else: + click.echo(f"description: {metadata.description}") + click.echo(f"owners: {metadata.owners}") + + if metadata.scheduling == {}: + click.echo("scheduling: not scheduled") + else: + click.echo("scheduling:") + click.echo(f" dag_name: {metadata.scheduling['dag_name']}") + + if cost or last_updated: + if not is_authenticated(): + click.echo( + "Authentication to GCP required for " + "accessing cost and last_updated." + ) + else: + client = bigquery.Client() + end_date = date.today().strftime("%Y-%m-%d") + start_date = (date.today() - timedelta(7)).strftime("%Y-%m-%d") + result = client.query( + f""" + SELECT + SUM(cost_usd) AS cost, + MAX(creation_time) AS last_updated + FROM `moz-fx-data-shared-prod.monitoring_derived.bigquery_etl_scheduled_queries_cost_v1` + WHERE submission_date BETWEEN '{start_date}' AND '{end_date}' + AND dataset = '{dataset}' + AND table = '{table}' + """ # noqa E501 + ).result() + + if result.total_rows == 0: + if last_updated: + click.echo("last_updated: never") + if cost: + click.echo("Cost over the last 7 days: none") + + for row in result: + if last_updated: + click.echo(f" last_updated: {row.last_updated}") + if cost: + click.echo( + f" Cost over the last 7 days: {round(row.cost, 2)} USD" + ) + click.echo("") + + +def _backfill_query( + query_file_path, + project_id, + date_partition_parameter, + exclude, + max_rows, + dry_run, + no_partition, + args, + partitioning_type, + backfill_date, + destination_table, +): + """Run a query backfill for a specific date.""" + project, dataset, table = extract_from_query_path(query_file_path) + + match partitioning_type: + case PartitionType.DAY: + partition = backfill_date.strftime("%Y%m%d") + case PartitionType.MONTH: + partition = backfill_date.strftime("%Y%m") + case _: + raise ValueError(f"Unsupported partitioning type: {partitioning_type}") + + backfill_date = backfill_date.strftime("%Y-%m-%d") + if backfill_date not in exclude: + if destination_table is None: + destination_table = f"{project}.{dataset}.{table}" + + if not no_partition: + destination_table = f"{destination_table}${partition}" + + if not QUALIFIED_TABLE_NAME_RE.match(destination_table): + click.echo( + "Destination table must be named like:" + " ..
" + ) + sys.exit(1) + + click.echo( + f"Run backfill for {destination_table} " + f"with @{date_partition_parameter}={backfill_date}" + ) + + arguments = [ + "query", + f"--parameter={date_partition_parameter}:DATE:{backfill_date}", + "--use_legacy_sql=false", + "--replace", + f"--max_rows={max_rows}", + f"--project_id={project_id}", + "--format=none", + ] + args + if dry_run: + arguments += ["--dry_run"] + + _run_query( + [query_file_path], + project_id=project_id, + dataset_id=dataset, + destination_table=destination_table, + public_project_id=ConfigLoader.get( + "default", "public_project", fallback="mozilla-public-data" + ), + query_arguments=arguments, + ) + + else: + click.echo( + f"Skip {query_file_path} with @{date_partition_parameter}={backfill_date}" + ) + + return True + + +@query.command( + help="""Run a backfill for a query. Additional parameters will get passed to bq. + + Examples: + + \b + # Backfill for specific date range + # second comment line + ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ + --start_date=2021-03-01 \\ + --end_date=2021-03-31 + + \b + # Dryrun backfill for specific date range and exclude date + ./bqetl query backfill telemetry_derived.ssl_ratios_v1 \\ + --start_date=2021-03-01 \\ + --end_date=2021-03-31 \\ + --exclude=2021-03-03 \\ + --dry_run + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument("name") +@sql_dir_option +@project_id_option(required=True) +@click.option( + "--start_date", + "--start-date", + "-s", + help="First date to be backfilled", + type=click.DateTime(formats=["%Y-%m-%d"]), + required=True, +) +@click.option( + "--end_date", + "--end-date", + "-e", + help="Last date to be backfilled", + type=click.DateTime(formats=["%Y-%m-%d"]), + default=str(date.today()), +) +@click.option( + "--exclude", + "-x", + multiple=True, + help="Dates excluded from backfill. Date format: yyyy-mm-dd", + default=[], +) +@click.option( + "--dry_run/--no_dry_run", "--dry-run/--no-dry-run", help="Dry run the backfill" +) +@click.option( + "--max_rows", + "-n", + type=int, + default=100, + help="How many rows to return in the result", +) +@click.option( + "--parallelism", + "-p", + type=int, + default=8, + help="How many threads to run backfill in parallel", +) +@click.option( + "--no_partition", + "--no-partition", + is_flag=True, + default=False, + help="Disable writing results to a partition. Overwrites entire destination table.", +) +@click.option( + "--destination_table", + "--destination-table", + required=False, + help=( + "Destination table name results are written to. " + + "If not set, determines destination table based on query." + ), +) +@click.pass_context +def backfill( + ctx, + name, + sql_dir, + project_id, + start_date, + end_date, + exclude, + dry_run, + max_rows, + parallelism, + no_partition, + destination_table, +): + """Run a backfill.""" + if not is_authenticated(): + click.echo( + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + if query_files == []: + # run SQL generators if no matching query has been found + ctx.invoke( + generate_all, + output_dir=ctx.obj["TMP_DIR"], + ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], + ) + query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) + + dates = [start_date + timedelta(i) for i in range((end_date - start_date).days + 1)] + + for query_file in query_files: + query_file_path = Path(query_file) + + depends_on_past = False + date_partition_parameter = "submission_date" + + try: + metadata = Metadata.of_query_file(str(query_file_path)) + depends_on_past = metadata.scheduling.get( + "depends_on_past", depends_on_past + ) + date_partition_parameter = metadata.scheduling.get( + "date_partition_parameter", date_partition_parameter + ) + + # For backwards compatibility assume partitioning type is day + # in case metadata is missing + if metadata.bigquery: + partitioning_type = metadata.bigquery.time_partitioning.type + else: + partitioning_type = PartitionType.DAY + click.echo( + "Bigquery partitioning type not set. Using PartitionType.DAY" + ) + + match partitioning_type: + case PartitionType.DAY: + dates = [ + start_date + timedelta(i) + for i in range((end_date - start_date).days + 1) + ] + case PartitionType.MONTH: + dates = list( + rrule( + freq=MONTHLY, + dtstart=start_date.replace(day=1), + until=end_date, + ) + ) + # Dates in excluded must be the first day of the month to match `dates` + exclude = [ + date.fromisoformat(day).replace(day=1).strftime("%Y-%m-%d") + for day in exclude + ] + case _: + raise ValueError( + f"Unsupported partitioning type: {partitioning_type}" + ) + + except FileNotFoundError: + click.echo(f"No metadata defined for {query_file_path}") + + if depends_on_past and exclude != []: + click.echo( + f"Warning: depends_on_past = True for {query_file_path} but the" + f"following dates will be excluded from the backfill: {exclude}" + ) + + client = bigquery.Client(project=project_id) + try: + project, dataset, table = extract_from_query_path(query_file_path) + client.get_table(f"{project}.{dataset}.{table}") + except NotFound: + ctx.invoke(initialize, name=query_file, dry_run=dry_run) + + backfill_query = partial( + _backfill_query, + query_file_path, + project_id, + date_partition_parameter, + exclude, + max_rows, + dry_run, + no_partition, + ctx.args, + partitioning_type, + destination_table=destination_table, + ) + + if not depends_on_past: + # run backfill for dates in parallel if depends_on_past is false + with Pool(parallelism) as p: + result = p.map(backfill_query, dates, chunksize=1) + if not all(result): + sys.exit(1) + else: + # if data depends on previous runs, then execute backfill sequentially + for backfill_date in dates: + backfill_query(backfill_date) + + +@query.command( + help="""Run a query. Additional parameters will get passed to bq.
+ If a destination_table is set, the query result will be written to BigQuery. Without a destination_table specified, the results are not stored.
+ If the `name` is not found within the `sql/` folder bqetl assumes it hasn't been generated yet + and will start the generating process for all `sql_generators/` files. + This generation process will take some time and run dryrun calls against BigQuery but this is expected.
+ Additional parameters (all parameters that are not specified in the Options) must come after the query-name. + Otherwise the first parameter that is not an option is interpreted as the query-name and since it can't be found the generation process will start. + + Examples: + + \b + # Run a query by name + ./bqetl query run telemetry_derived.ssl_ratios_v1 + + \b + # Run a query file + ./bqetl query run /path/to/query.sql + + \b + # Run a query and save the result to BigQuery + ./bqetl query run telemetry_derived.ssl_ratios_v1 \ + --project_id=moz-fx-data-shared-prod \ + --dataset_id=telemetry_derived \ + --destination_table=ssl_ratios_v1 + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument("name") +@sql_dir_option +@project_id_option() +@click.option( + "--public_project_id", + "--public-project-id", + default=ConfigLoader.get( + "default", "public_project", fallback="mozilla-public-data" + ), + help="Project with publicly accessible data", +) +@click.option( + "--destination_table", + "--destination-table", + required=False, + help=( + "Destination table name results are written to. " + + "If not set, the query result will not be written to BigQuery." + ), +) +@click.option( + "--dataset_id", + "--dataset-id", + required=False, + help=( + "Destination dataset results are written to. " + + "If not set, determines destination dataset based on query." + ), +) +@click.pass_context +def run( + ctx, + name, + sql_dir, + project_id, + public_project_id, + destination_table, + dataset_id, +): + """Run a query.""" + if not is_authenticated(): + click.echo( + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + if query_files == []: + # run SQL generators if no matching query has been found + ctx.invoke( + generate_all, + output_dir=ctx.obj["TMP_DIR"], + ignore=["derived_view_schemas", "stable_views", "country_code_lookup"], + ) + query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) + + _run_query( + query_files, + project_id, + public_project_id, + destination_table, + dataset_id, + ctx.args, + ) + + +def _run_query( + query_files, + project_id, + public_project_id, + destination_table, + dataset_id, + query_arguments, + addl_templates: typing.Optional[dict] = None, + mapped_values=None, +): + client = bigquery.Client(project_id) + """Run a query.""" + if dataset_id is not None: + # dataset ID was parsed by argparse but needs to be passed as parameter + # when running the query + query_arguments.append("--dataset_id={}".format(dataset_id)) + + if project_id is not None: + query_arguments.append(f"--project_id={project_id}") + + if addl_templates is None: + addl_templates = {} + + for query_file in query_files: + use_public_table = False + + query_file = Path(query_file) + try: + metadata = Metadata.of_query_file(query_file) + if metadata.is_public_bigquery(): + if not validate_metadata.validate_public_data(metadata, query_file): + sys.exit(1) + + # Change the destination table to write results to the public dataset; + # a view to the public table in the internal dataset is created + # when CI runs + if ( + dataset_id is not None + and destination_table is not None + and re.match(DESTINATION_TABLE_RE, destination_table) + ): + destination_table = "{}:{}.{}".format( + public_project_id, dataset_id, destination_table + ) + query_arguments.append( + "--destination_table={}".format(destination_table) + ) + use_public_table = True + else: + print( + "ERROR: Cannot run public dataset query. Parameters" + " --destination_table=
and" + " --dataset_id= required" + ) + sys.exit(1) + except yaml.YAMLError as e: + logging.error(e) + sys.exit(1) + except FileNotFoundError: + logging.warning("No metadata.yaml found for {}", query_file) + + if not use_public_table and destination_table is not None: + # destination table was parsed by argparse, however if it wasn't modified to + # point to a public table it needs to be passed as parameter for the query + + if re.match(QUALIFIED_TABLE_NAME_RE, destination_table): + project, dataset, table = qualified_table_name_matching( + destination_table + ) + destination_table = "{}:{}.{}".format(project, dataset, table) + + query_arguments.append("--destination_table={}".format(destination_table)) + + if bool(list(filter(lambda x: x.startswith("--parameter"), query_arguments))): + # need to do this as parameters are not supported with legacy sql + query_arguments.append("--use_legacy_sql=False") + + # this assumed query command should always be passed inside query_arguments + if "query" not in query_arguments: + query_arguments = ["query"] + query_arguments + + # write rendered query to a temporary file; + # query string cannot be passed directly to bq as SQL comments will be interpreted as CLI arguments + with tempfile.NamedTemporaryFile(mode="w+") as query_stream: + query_stream.write( + render_template( + query_file.name, + template_folder=str(query_file.parent), + templates_dir="", + format=False, + **addl_templates, + ) + ) + query_stream.seek(0) + query_content = query_stream.read() + + if mapped_values is not None and "{mapped_values}" in query_content: + # Format the query template and run the query. + print(f"Running for {mapped_values}...") + query = query_content.format(mapped_values=mapped_values) + job = client.query( + query=query, + job_config=bigquery.QueryJobConfig( + use_query_cache=False, + use_legacy_sql=False, + ), + ) + job.result() + else: + # run the query as shell command so that passed parameters can be used as is. + subprocess.check_call(["bq"] + query_arguments, stdin=query_stream) + + +@query.command( + help="""Run a multipart query. + + Examples: + + \b + # Run a multipart query + ./bqetl query run_multipart /path/to/query.sql + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument( + "query_dir", + type=click.Path(file_okay=False), +) +@click.option( + "--using", + default="document_id", + help="comma separated list of join columns to use when combining results", +) +@click.option( + "--parallelism", + default=4, + type=int, + help="Maximum number of queries to execute concurrently", +) +@click.option( + "--dataset_id", + "--dataset-id", + help="Default dataset, if not specified all tables must be qualified with dataset", +) +@project_id_option() +@temp_dataset_option() +@click.option( + "--destination_table", + required=True, + help="table where combined results will be written", +) +@click.option( + "--time_partitioning_field", + type=lambda f: bigquery.TimePartitioning(field=f), + help="time partition field on the destination table", +) +@click.option( + "--clustering_fields", + type=lambda f: f.split(","), + help="comma separated list of clustering fields on the destination table", +) +@click.option( + "--dry_run", + "--dry-run", + is_flag=True, + default=False, + help="Print bytes that would be processed for each part and don't run queries", +) +@click.option( + "--parameters", + "--parameter", + multiple=True, + default=[], + type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)), + metavar="NAME:TYPE:VALUE", + help="query parameter(s) to pass when running parts", +) +@click.option( + "--priority", + default="INTERACTIVE", + type=click.Choice(["BATCH", "INTERACTIVE"]), + help=( + "Priority for BigQuery query jobs; BATCH priority will significantly slow " + "down queries if reserved slots are not enabled for the billing project; " + "defaults to INTERACTIVE" + ), +) +@click.option( + "--schema_update_options", + "--schema_update_option", + multiple=True, + type=click.Choice( + [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + # Airflow passes an empty string when the field addition date doesn't + # match the run date. + # See https://github.com/mozilla/telemetry-airflow/blob/ + # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 + "", + ] + ), + default=[], + help="Optional options for updating the schema.", +) +def run_multipart( + query_dir, + using, + parallelism, + dataset_id, + project_id, + temp_dataset, + destination_table, + time_partitioning_field, + clustering_fields, + dry_run, + parameters, + priority, + schema_update_options, +): + """Run a multipart query.""" + if dataset_id is not None and "." not in dataset_id and project_id is not None: + dataset_id = f"{project_id}.{dataset_id}" + if "." not in destination_table and dataset_id is not None: + destination_table = f"{dataset_id}.{destination_table}" + client = bigquery.Client(project_id) + with ThreadPool(parallelism) as pool: + parts = pool.starmap( + _run_part, + [ + ( + client, + part, + query_dir, + temp_dataset, + dataset_id, + dry_run, + parameters, + priority, + ) + for part in sorted(next(os.walk(query_dir))[2]) + if part.startswith("part") and part.endswith(".sql") + ], + chunksize=1, + ) + if not dry_run: + total_bytes = sum(job.total_bytes_processed for _, job in parts) + query = ( + f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`" + + "".join( + f"\nFULL JOIN\n `{sql_table_id(job.destination)}`" + f"\nUSING\n ({using})" + for _, job in parts[1:] + ) + ) + try: + job = client.query( + query=query, + job_config=bigquery.QueryJobConfig( + destination=destination_table, + time_partitioning=time_partitioning_field, + clustering_fields=clustering_fields, + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, + use_legacy_sql=False, + priority=priority, + schema_update_options=schema_update_options, + ), + ) + job.result() + logging.info( + f"Processed {job.total_bytes_processed:,d} bytes to combine results" + ) + total_bytes += job.total_bytes_processed + logging.info(f"Processed {total_bytes:,d} bytes in total") + finally: + for _, job in parts: + client.delete_table(sql_table_id(job.destination).split("$")[0]) + logging.info(f"Deleted {len(parts)} temporary tables") + + +def _run_part( + client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority +): + """Run a query part.""" + with open(os.path.join(query_dir, part)) as sql_file: + query = sql_file.read() + job_config = bigquery.QueryJobConfig( + destination=temp_dataset.temp_table(), + default_dataset=dataset_id, + use_legacy_sql=False, + dry_run=dry_run, + query_parameters=parameters, + priority=priority, + allow_large_results=True, + ) + job = client.query(query=query, job_config=job_config) + if job.dry_run: + logging.info(f"Would process {job.total_bytes_processed:,d} bytes for {part}") + else: + job.result() + logging.info(f"Processed {job.total_bytes_processed:,d} bytes for {part}") + return part, job + + +@query.command( + help="""Validate a query. + Checks formatting, scheduling information and dry runs the query. + + Examples: + + ./bqetl query validate telemetry_derived.clients_daily_v6 + + \b + # Validate query not in shared-prod + ./bqetl query validate \\ + --use_cloud_function=false \\ + --project_id=moz-fx-data-marketing-prod \\ + ga_derived.blogs_goals_v1 + """, +) +@click.argument("name", required=False) +@sql_dir_option +@project_id_option() +@use_cloud_function_option +@click.option( + "--validate_schemas", + "--validate-schemas", + help="Require dry run schema to match destination table and file if present.", + is_flag=True, + default=False, +) +@respect_dryrun_skip_option(default=False) +@no_dryrun_option(default=False) +@click.pass_context +def validate( + ctx, + name, + sql_dir, + project_id, + use_cloud_function, + validate_schemas, + respect_dryrun_skip, + no_dryrun, +): + """Validate queries by dry running, formatting and checking scheduling configs.""" + if name is None: + name = "*.*" + + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + dataset_dirs = set() + for query in query_files: + ctx.invoke(format, paths=[str(query)]) + + if not no_dryrun: + ctx.invoke( + dryrun, + paths=[str(query)], + use_cloud_function=use_cloud_function, + project=project_id, + validate_schemas=validate_schemas, + respect_skip=respect_dryrun_skip, + ) + + validate_metadata.validate(query.parent) + dataset_dirs.add(query.parent.parent) + + if no_dryrun: + click.echo("Dry run skipped for query files.") + + for dataset_dir in dataset_dirs: + validate_metadata.validate_datasets(dataset_dir) + + +def _initialize_in_parallel( + project, + table, + dataset, + query_file, + arguments, + parallelism, + addl_templates, +): + mapped_values = [f"sample_id = {i}" for i in list(range(0, 100))] + + with ThreadPool(parallelism) as pool: + start = default_timer() + # Process all subsets in the query in parallel (eg. all sample_ids). + pool.map( + partial( + _run_query, + [query_file], + project, + None, + table, + dataset, + arguments, + addl_templates, + ), + mapped_values, + ) + print(f"Job completed in {default_timer() - start}") + + +@query.command( + help="""Create and initialize the destination table for the query. + Only for queries that have an `init.sql` file. + + Examples: + + ./bqetl query initialize telemetry_derived.ssl_ratios_v1 + """, +) +@click.argument("name") +@sql_dir_option +@project_id_option() +@click.option( + "--dry_run/--no_dry_run", + "--dry-run/--no-dry-run", + help="Dry run the initialization", +) +@click.pass_context +def initialize(ctx, name, sql_dir, project_id, dry_run): + """Create the destination table for the provided query.""" + if not is_authenticated(): + click.echo("Authentication required for creating tables.", err=True) + sys.exit(1) + + if Path(name).exists(): + # Allow name to be a path + query_files = [Path(name)] + else: + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + + if not query_files: + click.echo( + f"Couldn't find directory matching `{name}`. Failed to initialize query.", + err=True, + ) + sys.exit(1) + + for query_file in query_files: + sql_content = query_file.read_text() + client = bigquery.Client() + + # Enable initialization from query.sql files + # Create the table by deploying the schema and metadata, then run the init. + # This does not currently verify the accuracy of the schema or that it + # matches the query. + if "is_init()" in sql_content: + project = query_file.parent.parent.parent.name + dataset = query_file.parent.parent.name + destination_table = query_file.parent.name + full_table_id = f"{project}.{dataset}.{destination_table}" + + try: + table = client.get_table(full_table_id) + except NotFound: + table = bigquery.Table(full_table_id) + + if table.created: + raise PreconditionFailed( + f"Table {full_table_id} already exists. The initialization process is terminated." + ) + ctx.invoke(deploy, name=full_table_id, force=True) + + arguments = [ + "query", + "--use_legacy_sql=false", + "--replace", + "--format=none", + ] + if dry_run: + arguments += ["--dry_run"] + + if "parallel_run" in sql_content: + _initialize_in_parallel( + project=project, + table=destination_table, + dataset=dataset, + query_file=query_file, + arguments=arguments, + parallelism=DEFAULT_PARALLELISM, + addl_templates={ + "is_init": lambda: True, + "parallel_run": lambda: True, + }, + ) + else: + _run_query( + project_id=project, + public_project_id=None, + destination_table=destination_table, + dataset_id=dataset, + query_arguments=arguments, + addl_templates={ + "is_init": lambda: True, + }, + ) + else: + init_files = Path(query_file.parent).rglob("init.sql") + + for init_file in init_files: + project = init_file.parent.parent.parent.name + + with open(init_file) as init_file_stream: + init_sql = init_file_stream.read() + dataset = Path(init_file).parent.parent.name + destination_table = query_file.parent.name + job_config = bigquery.QueryJobConfig( + dry_run=dry_run, + default_dataset=f"{project}.{dataset}", + destination=f"{project}.{dataset}.{destination_table}", + ) + + if "CREATE MATERIALIZED VIEW" in init_sql: + click.echo(f"Create materialized view for {init_file}") + # existing materialized view have to be deleted before re-creation + view_name = query_file.parent.name + client.delete_table( + f"{project}.{dataset}.{view_name}", not_found_ok=True + ) + else: + click.echo(f"Create destination table for {init_file}") + + job = client.query(init_sql, job_config=job_config) + + if not dry_run: + job.result() + + +@query.command( + help="""Render a query Jinja template. + + Examples: + + ./bqetl query render telemetry_derived.ssl_ratios_v1 \\ + --output-dir=/tmp + """, + context_settings=dict( + ignore_unknown_options=True, + allow_extra_args=True, + ), +) +@click.argument("name") +@sql_dir_option +@click.option( + "--output-dir", + "--output_dir", + help="Output directory generated SQL is written to. " + + "If not specified, rendered queries are printed to console.", + type=click.Path(file_okay=False), + required=False, +) +def render(name, sql_dir, output_dir): + """Render a query Jinja template.""" + if name is None: + name = "*.*" + + query_files = paths_matching_name_pattern(name, sql_dir, project_id=None) + resolved_sql_dir = Path(sql_dir).resolve() + for query_file in query_files: + table_name = query_file.parent.name + dataset_id = query_file.parent.parent.name + project_id = query_file.parent.parent.parent.name + + jinja_params = { + **{ + "project_id": project_id, + "dataset_id": dataset_id, + "table_name": table_name, + }, + } + + rendered_sql = ( + render_template( + query_file.name, + template_folder=query_file.parent, + templates_dir="", + format=False, + **jinja_params, + ) + + "\n" + ) + + if not any(s in str(query_file) for s in skip_format()): + rendered_sql = reformat(rendered_sql, trailing_newline=True) + + if output_dir: + output_file = output_dir / query_file.resolve().relative_to( + resolved_sql_dir + ) + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(rendered_sql) + else: + click.echo(query_file) + click.echo(rendered_sql) + + +def _parse_partition_setting(partition_date): + params = partition_date.split(":") + if len(params) != 3: + return None + + # Check date format + try: + datetime.datetime.strptime(params[2], "%Y-%m-%d").date() + except ValueError: + return None + + # Check column name + if re.match(r"^\w+$", params[0]): + return {params[0]: params[2]} + + +def _validate_partition_date(ctx, param, partition_date): + """Process the CLI parameter check_date and set the parameter for BigQuery.""" + # Will be None if launched from Airflow. Also ctx.args is not populated at this stage. + if partition_date: + parsed = _parse_partition_setting(partition_date) + if parsed is None: + raise click.BadParameter("Format must be ::") + return parsed + return None + + +def _parse_check_output(output: str) -> str: + output = output.replace("\n", " ") + if "ETL Data Check Failed:" in output: + return f"ETL Data Check Failed:{output.split('ETL Data Check Failed:')[1]}" + return output + + +@query.group(help="Commands for managing query schemas.") +def schema(): + """Create the CLI group for the query schema command.""" + pass + + +@schema.command( + help=""" + Update the query schema based on the destination table schema and the query schema. + If no schema.yaml file exists for a query, one will be created. + + Examples: + + ./bqetl query schema update telemetry_derived.clients_daily_v6 + + # Update schema including downstream dependencies (requires GCP) + ./bqetl query schema update telemetry_derived.clients_daily_v6 --update-downstream + """, +) +@click.argument("name") +@sql_dir_option +@click.option( + "--project-id", + "--project_id", + help="GCP project ID", + default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), + callback=is_valid_project, +) +@click.option( + "--update-downstream", + "--update_downstream", + help="Update downstream dependencies. GCP authentication required.", + default=False, + is_flag=True, +) +@click.option( + "--tmp-dataset", + "--tmp_dataset", + help="GCP datasets for creating updated tables temporarily.", + default="tmp", +) +@use_cloud_function_option +@respect_dryrun_skip_option(default=True) +@parallelism_option +def update( + name, + sql_dir, + project_id, + update_downstream, + tmp_dataset, + use_cloud_function, + respect_dryrun_skip, + parallelism, +): + """CLI command for generating the query schema.""" + if not is_authenticated(): + click.echo( + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + query_files = paths_matching_name_pattern( + name, sql_dir, project_id, files=["query.sql"] + ) + dependency_graph = get_dependency_graph([sql_dir], without_views=True) + manager = multiprocessing.Manager() + tmp_tables = manager.dict({}) + + # order query files to make sure derived_from dependencies are resolved + query_file_graph = {} + for query_file in query_files: + query_file_graph[query_file] = [] + try: + metadata = Metadata.of_query_file(str(query_file)) + if metadata and metadata.schema and metadata.schema.derived_from: + for derived_from in metadata.schema.derived_from: + parent_queries = [ + query + for query in paths_matching_name_pattern( + ".".join(derived_from.table), sql_dir, project_id + ) + ] + + if len(parent_queries) > 0: + query_file_graph[query_file].append(parent_queries[0]) + + except FileNotFoundError: + query_file_graph[query_file] = [] + + ts = ParallelTopologicalSorter( + query_file_graph, parallelism=parallelism, with_follow_up=update_downstream + ) + ts.map( + partial( + _update_query_schema_with_downstream, + sql_dir, + project_id, + tmp_dataset, + dependency_graph, + tmp_tables, + use_cloud_function, + respect_dryrun_skip, + update_downstream, + ) + ) + + if len(tmp_tables) > 0: + client = bigquery.Client() + # delete temporary tables + for _, table in tmp_tables.items(): + client.delete_table(table, not_found_ok=True) + + +def _update_query_schema_with_downstream( + sql_dir, + project_id, + tmp_dataset, + dependency_graph, + tmp_tables={}, + use_cloud_function=True, + respect_dryrun_skip=True, + update_downstream=False, + query_file=None, + follow_up_queue=None, +): + try: + changed = _update_query_schema( + query_file, + sql_dir, + project_id, + tmp_dataset, + tmp_tables, + use_cloud_function, + respect_dryrun_skip, + ) + + if update_downstream: + # update downstream dependencies + if changed: + if not is_authenticated(): + click.echo( + "Cannot update downstream dependencies." + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + + project, dataset, table = extract_from_query_path(query_file) + identifier = f"{project}.{dataset}.{table}" + tmp_identifier = f"{project}.{tmp_dataset}.{table}_{random_str(12)}" + + # create temporary table with updated schema + if identifier not in tmp_tables: + schema = Schema.from_schema_file(query_file.parent / SCHEMA_FILE) + schema.deploy(tmp_identifier) + tmp_tables[identifier] = tmp_identifier + + # get downstream dependencies that will be updated in the next iteration + dependencies = [ + p + for k, refs in dependency_graph.items() + for p in paths_matching_name_pattern( + k, sql_dir, project_id, files=("query.sql",) + ) + if identifier in refs + ] + + for d in dependencies: + click.echo(f"Update downstream dependency schema for {d}") + if follow_up_queue: + follow_up_queue.put(d) + except Exception: + print_exc() + + +def _update_query_schema( + query_file, + sql_dir, + project_id, + tmp_dataset, + tmp_tables={}, + use_cloud_function=True, + respect_dryrun_skip=True, +): + """ + Update the schema of a specific query file. + + Return True if the schema changed, False if it is unchanged. + """ + if respect_dryrun_skip and str(query_file) in DryRun.skipped_files(): + click.echo(f"{query_file} dry runs are skipped. Cannot update schemas.") + return + + tmp_tables = copy.deepcopy(tmp_tables) + query_file_path = Path(query_file) + existing_schema_path = query_file_path.parent / SCHEMA_FILE + project_name, dataset_name, table_name = extract_from_query_path(query_file_path) + + try: + metadata = Metadata.of_query_file(str(query_file_path)) + except FileNotFoundError: + metadata = None + click.echo(f"No metadata defined for {query_file_path}") + + # pull in updates from parent schemas + if metadata and metadata.schema and metadata.schema.derived_from: + for derived_from in metadata.schema.derived_from: + parent_queries = [ + query + for query in paths_matching_name_pattern( + ".".join(derived_from.table), sql_dir, project_id + ) + ] + + if len(parent_queries) == 0: + click.echo( + f"derived_from query {derived_from.table} does not exist.", + err=True, + ) + else: + parent_schema = Schema.from_schema_file( + parent_queries[0].parent / SCHEMA_FILE + ) + parent_project, parent_dataset, parent_table = extract_from_query_path( + parent_queries[0] + ) + parent_identifier = f"{parent_project}.{parent_dataset}.{parent_table}" + + if parent_identifier not in tmp_tables: + tmp_parent_identifier = ( + f"{parent_project}.{tmp_dataset}.{parent_table}_" + + random_str(12) + ) + parent_schema.deploy(tmp_parent_identifier) + tmp_tables[parent_identifier] = tmp_parent_identifier + + if existing_schema_path.is_file(): + existing_schema = Schema.from_schema_file(existing_schema_path) + else: + existing_schema = Schema.empty() + + existing_schema.merge(parent_schema, exclude=derived_from.exclude) + + # use temporary table + tmp_identifier = ( + f"{project_name}.{tmp_dataset}.{table_name}_{random_str(12)}" + ) + existing_schema.deploy(tmp_identifier) + tmp_tables[ + f"{project_name}.{dataset_name}.{table_name}" + ] = tmp_identifier + existing_schema.to_yaml_file(existing_schema_path) + + # replace temporary table references + sql_content = render_template( + query_file_path.name, + template_folder=str(query_file_path.parent), + templates_dir="", + format=False, + ) + + for orig_table, tmp_table in tmp_tables.items(): + table_parts = orig_table.split(".") + for i in range(len(table_parts)): + if ".".join(table_parts[i:]) in sql_content: + sql_content = sql_content.replace(".".join(table_parts[i:]), tmp_table) + break + + query_schema = None + try: + query_schema = Schema.from_query_file( + query_file_path, + content=sql_content, + use_cloud_function=use_cloud_function, + respect_skip=respect_dryrun_skip, + ) + except Exception: + if not existing_schema_path.exists(): + click.echo( + click.style( + f"Cannot automatically update {query_file_path}. " + f"Please update {query_file_path / SCHEMA_FILE} manually.", + fg="red", + ), + err=True, + ) + return + + # update bigquery metadata + try: + client = bigquery.Client() + table = client.get_table(f"{project_name}.{dataset_name}.{table_name}") + metadata_file_path = query_file_path.parent / METADATA_FILE + + if ( + table.time_partitioning + and metadata + and ( + metadata.bigquery is None or metadata.bigquery.time_partitioning is None + ) + ): + metadata.set_bigquery_partitioning( + field=table.time_partitioning.field, + partition_type=table.time_partitioning.type_.lower(), + required=table.time_partitioning.require_partition_filter, + expiration_days=table.time_partitioning.expiration_ms / 86400000.0 + if table.time_partitioning.expiration_ms + else None, + ) + click.echo(f"Partitioning metadata added to {metadata_file_path}") + + if ( + table.clustering_fields + and metadata + and (metadata.bigquery is None or metadata.bigquery.clustering is None) + ): + metadata.set_bigquery_clustering(table.clustering_fields) + click.echo(f"Clustering metadata added to {metadata_file_path}") + + if metadata: + metadata.write(metadata_file_path) + except NotFound: + click.echo( + f"Destination table {project_name}.{dataset_name}.{table_name} " + "does not exist in BigQuery. Run bqetl query schema deploy " + ".
to create the destination table." + ) + except FileNotFoundError: + click.echo( + f"No metadata file for {project_name}.{dataset_name}.{table_name}." + " Skip schema update." + ) + return + + partitioned_by = None + try: + metadata = Metadata.of_query_file(query_file_path) + + if metadata.bigquery and metadata.bigquery.time_partitioning: + partitioned_by = metadata.bigquery.time_partitioning.field + except FileNotFoundError: + pass + + table_schema = Schema.for_table( + project_name, + dataset_name, + table_name, + partitioned_by, + use_cloud_function=use_cloud_function, + respect_skip=respect_dryrun_skip, + ) + + changed = True + + if existing_schema_path.is_file(): + existing_schema = Schema.from_schema_file(existing_schema_path) + old_schema = copy.deepcopy(existing_schema) + if table_schema: + existing_schema.merge(table_schema) + + if query_schema: + existing_schema.merge(query_schema) + existing_schema.to_yaml_file(existing_schema_path) + changed = not existing_schema.equal(old_schema) + else: + query_schema.merge(table_schema) + query_schema.to_yaml_file(existing_schema_path) + + click.echo(f"Schema {existing_schema_path} updated.") + return changed + + +@schema.command( + help="""Deploy the query schema. + + Examples: + + ./bqetl query schema deploy telemetry_derived.clients_daily_v6 + """, +) +@click.argument("name") +@sql_dir_option +@click.option( + "--project-id", + "--project_id", + help="GCP project ID", + default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), + callback=is_valid_project, +) +@click.option( + "--force/--noforce", + help="Deploy the schema file without validating that it matches the query", + default=False, +) +@use_cloud_function_option +@respect_dryrun_skip_option(default=True) +@click.option( + "--skip-existing", + "--skip_existing", + help="Skip updating existing tables. " + + "This option ensures that only new tables get deployed.", + default=False, + is_flag=True, +) +@click.option( + "--skip-external-data", + "--skip_external_data", + help="Skip publishing external data, such as Google Sheets.", + default=False, + is_flag=True, +) +@click.option( + "--destination_table", + "--destination-table", + required=False, + help=( + "Destination table name results are written to. " + + "If not set, determines destination table based on query. " + + "Must be fully qualified (project.dataset.table)." + ), +) +@parallelism_option +@click.pass_context +def deploy( + ctx, + name, + sql_dir, + project_id, + force, + use_cloud_function, + respect_dryrun_skip, + skip_existing, + skip_external_data, + destination_table, + parallelism, +): + """CLI command for deploying destination table schemas.""" + if not is_authenticated(): + click.echo( + "Authentication to GCP required. Run `gcloud auth login` " + "and check that the project is set correctly." + ) + sys.exit(1) + client = bigquery.Client() + + query_files = paths_matching_name_pattern(name, sql_dir, project_id, ["query.*"]) + if not query_files: + # run SQL generators if no matching query has been found + ctx.invoke( + generate_all, + output_dir=ctx.obj["TMP_DIR"], + ignore=["derived_view_schemas", "stable_views"], + ) + query_files = paths_matching_name_pattern( + name, ctx.obj["TMP_DIR"], project_id, ["query.*"] + ) + + def _deploy(query_file): + if respect_dryrun_skip and str(query_file) in DryRun.skipped_files(): + click.echo(f"{query_file} dry runs are skipped. Cannot validate schemas.") + return + + query_file_path = Path(query_file) + existing_schema_path = query_file_path.parent / SCHEMA_FILE + + if not existing_schema_path.is_file(): + click.echo(f"No schema file found for {query_file}") + return + + try: + table_name = query_file_path.parent.name + dataset_name = query_file_path.parent.parent.name + project_name = query_file_path.parent.parent.parent.name + + if destination_table: + full_table_id = destination_table + else: + full_table_id = f"{project_name}.{dataset_name}.{table_name}" + + existing_schema = Schema.from_schema_file(existing_schema_path) + + if not force and str(query_file_path).endswith("query.sql"): + query_schema = Schema.from_query_file( + query_file_path, + use_cloud_function=use_cloud_function, + respect_skip=respect_dryrun_skip, + ) + if not existing_schema.equal(query_schema): + click.echo( + f"Query {query_file_path} does not match " + f"schema in {existing_schema_path}. " + f"To update the local schema file, " + f"run `./bqetl query schema update " + f"{dataset_name}.{table_name}`", + err=True, + ) + sys.exit(1) + + with NamedTemporaryFile(suffix=".json") as tmp_schema_file: + existing_schema.to_json_file(Path(tmp_schema_file.name)) + bigquery_schema = client.schema_from_json(tmp_schema_file.name) + + try: + table = client.get_table(full_table_id) + except NotFound: + table = bigquery.Table(full_table_id) + + table.schema = bigquery_schema + _attach_metadata(query_file_path, table) + + if not table.created: + client.create_table(table) + click.echo(f"Destination table {full_table_id} created.") + elif not skip_existing: + client.update_table( + table, + [ + "schema", + "friendly_name", + "description", + "time_partitioning", + "clustering_fields", + "labels", + ], + ) + click.echo(f"Schema (and metadata) updated for {full_table_id}.") + except Exception: + print_exc() + return query_file + + with ThreadPool(parallelism) as pool: + failed_deploys = [r for r in pool.map(_deploy, query_files) if r] + + if not skip_external_data: + failed_external_deploys = _deploy_external_data( + name, sql_dir, project_id, skip_existing + ) + failed_deploys += failed_external_deploys + + if len(failed_deploys) > 0: + click.echo("The following tables could not be deployed:") + for failed_deploy in failed_deploys: + click.echo(failed_deploy) + sys.exit(1) + + click.echo("All tables have been deployed.") + + +def _attach_metadata(query_file_path: Path, table: bigquery.Table) -> None: + """Add metadata from query file's metadata.yaml to table object.""" + try: + metadata = Metadata.of_query_file(query_file_path) + except FileNotFoundError: + return + + table.description = metadata.description + table.friendly_name = metadata.friendly_name + + if metadata.bigquery and metadata.bigquery.time_partitioning: + table.time_partitioning = bigquery.TimePartitioning( + metadata.bigquery.time_partitioning.type.bigquery_type, + field=metadata.bigquery.time_partitioning.field, + require_partition_filter=( + metadata.bigquery.time_partitioning.require_partition_filter + ), + expiration_ms=metadata.bigquery.time_partitioning.expiration_ms, + ) + + if metadata.bigquery and metadata.bigquery.clustering: + table.clustering_fields = metadata.bigquery.clustering.fields + + # BigQuery only allows for string type labels with specific requirements to be published: + # https://cloud.google.com/bigquery/docs/labels-intro#requirements + if metadata.labels: + table.labels = { + key: value + for key, value in metadata.labels.items() + if isinstance(value, str) + } + + +def _deploy_external_data( + name, + sql_dir, + project_id, + skip_existing, +) -> list: + """Publish external data tables.""" + # whether a table should be created from external data is defined in the metadata + metadata_files = paths_matching_name_pattern( + name, sql_dir, project_id, ["metadata.yaml"] + ) + client = bigquery.Client() + failed_deploys = [] + for metadata_file_path in metadata_files: + metadata = Metadata.from_file(metadata_file_path) + if not metadata.external_data: + # skip all tables that are not created from external data + continue + + existing_schema_path = metadata_file_path.parent / SCHEMA_FILE + + if not existing_schema_path.is_file(): + # tables created from external data must specify a schema + click.echo(f"No schema file found for {metadata_file_path}") + continue + + try: + table_name = metadata_file_path.parent.name + dataset_name = metadata_file_path.parent.parent.name + project_name = metadata_file_path.parent.parent.parent.name + full_table_id = f"{project_name}.{dataset_name}.{table_name}" + + existing_schema = Schema.from_schema_file(existing_schema_path) + + try: + table = client.get_table(full_table_id) + except NotFound: + table = bigquery.Table(full_table_id) + + with NamedTemporaryFile(suffix=".json") as tmp_schema_file: + existing_schema.to_json_file(Path(tmp_schema_file.name)) + bigquery_schema = client.schema_from_json(tmp_schema_file.name) + + table.schema = bigquery_schema + _attach_metadata(metadata_file_path, table) + + if not table.created: + if metadata.external_data.format in ( + ExternalDataFormat.GOOGLE_SHEETS, + ExternalDataFormat.CSV, + ): + external_config = bigquery.ExternalConfig( + metadata.external_data.format.value.upper() + ) + external_config.source_uris = metadata.external_data.source_uris + external_config.ignore_unknown_values = True + external_config.autodetect = False + + for key, v in metadata.external_data.options.items(): + setattr(external_config.options, key, v) + + table.external_data_configuration = external_config + table = client.create_table(table) + click.echo(f"Destination table {full_table_id} created.") + + else: + click.echo( + f"External data format {metadata.external_data.format} unsupported." + ) + elif not skip_existing: + client.update_table( + table, + [ + "schema", + "friendly_name", + "description", + "labels", + ], + ) + click.echo(f"Schema (and metadata) updated for {full_table_id}.") + except Exception: + print_exc() + failed_deploys.append(metadata_file_path) + + return failed_deploys + + +def _validate_schema_from_path( + query_file_path, use_cloud_function=True, respect_dryrun_skip=True +): + """Dry Runs and validates a query schema from its path.""" + return ( + DryRun( + query_file_path, + use_cloud_function=use_cloud_function, + respect_skip=respect_dryrun_skip, + ).validate_schema(), + query_file_path, + ) + + +@schema.command( + help="""Validate the query schema + + Examples: + + ./bqetl query schema validate telemetry_derived.clients_daily_v6 + """, + name="validate", +) +@click.argument("name") +@sql_dir_option +@click.option( + "--project-id", + "--project_id", + help="GCP project ID", + default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), + callback=is_valid_project, +) +@use_cloud_function_option +@respect_dryrun_skip_option(default=True) +@click.pass_context +def validate_schema( + ctx, name, sql_dir, project_id, use_cloud_function, respect_dryrun_skip +): + """Validate the defined query schema with the query and the destination table.""" + query_files = paths_matching_name_pattern(name, sql_dir, project_id) + if query_files == []: + # run SQL generators if no matching query has been found + ctx.invoke( + generate_all, + output_dir=ctx.obj["TMP_DIR"], + ignore=["derived_view_schemas", "stable_views"], + ) + query_files = paths_matching_name_pattern(name, ctx.obj["TMP_DIR"], project_id) + + _validate_schema = partial( + _validate_schema_from_path, + use_cloud_function=use_cloud_function, + respect_dryrun_skip=respect_dryrun_skip, + ) + + with Pool(8) as p: + result = p.map(_validate_schema, query_files, chunksize=1) + + all_valid = True + + for is_valid, query_file_path in result: + if is_valid is False: + if all_valid: + click.echo("\nSchemas for the following queries are invalid:") + all_valid = False + click.echo(query_file_path) + + if not all_valid: + sys.exit(1) + else: + click.echo("\nAll schemas are valid.") diff --git a/backfill/2023-09-26-initialize-clients_first_seen_v2/telemetry_derived_clients_first_seen_v2_query.sql b/backfill/2023-09-26-initialize-clients_first_seen_v2/telemetry_derived_clients_first_seen_v2_query.sql new file mode 100644 index 0000000..5b58819 --- /dev/null +++ b/backfill/2023-09-26-initialize-clients_first_seen_v2/telemetry_derived_clients_first_seen_v2_query.sql @@ -0,0 +1,360 @@ +-- Query for telemetry_derived.clients_first_seen_v2 +{% if is_init() and parallel_run() %} +INSERT INTO + `moz-fx-data-shared-prod.telemetry_derived.clients_first_seen_v2` +{% endif %} +-- Query for telemetry_derived.clients_first_seen_v2 +-- Each ping type subquery retrieves all attributes as reported on the first +-- ping received and respecting NULLS. +-- Once the first_seen_date is identified after comparing all pings, attributes +-- are retrieved for each client_id from the ping type that reported it. +WITH new_profile_ping AS ( + SELECT + client_id AS client_id, + sample_id AS sample_id, + MIN(submission_timestamp) AS first_seen_timestamp, + ARRAY_AGG(DATE(submission_timestamp) ORDER BY submission_timestamp ASC) AS all_dates, + ARRAY_AGG(application.architecture RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS architecture, + ARRAY_AGG(environment.build.build_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_build_id, + ARRAY_AGG(normalized_app_name RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_name, + ARRAY_AGG(environment.settings.locale RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS locale, + ARRAY_AGG(application.platform_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS platform_version, + ARRAY_AGG(application.vendor RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS vendor, + ARRAY_AGG(application.version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_version, + ARRAY_AGG(application.xpcom_abi RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS xpcom_abi, + ARRAY_AGG(document_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS document_id, + ARRAY_AGG(environment.partner.distribution_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS distribution_id, + ARRAY_AGG(environment.partner.distribution_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distribution_version, + ARRAY_AGG(environment.partner.distributor RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distributor, + ARRAY_AGG(environment.partner.distributor_channel RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distributor_channel, + ARRAY_AGG(environment.partner.partner_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_id, + ARRAY_AGG(environment.settings.attribution.campaign RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_campaign, + ARRAY_AGG(environment.settings.attribution.content RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_content, + ARRAY_AGG(environment.settings.attribution.experiment RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_experiment, + ARRAY_AGG(environment.settings.attribution.medium RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_medium, + ARRAY_AGG(environment.settings.attribution.source RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_source, + ARRAY_AGG(environment.settings.attribution.ua RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_ua, + ARRAY_AGG(environment.settings.default_search_engine_data.load_path RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_load_path, + ARRAY_AGG(environment.settings.default_search_engine_data.name RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_name, + ARRAY_AGG(environment.settings.default_search_engine_data.origin RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_origin, + ARRAY_AGG(environment.settings.default_search_engine_data.submission_url RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_submission_url, + ARRAY_AGG(environment.system.apple_model_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS apple_model_id, + ARRAY_AGG(metadata.geo.city RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS city, + ARRAY_AGG(metadata.geo.db_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS db_version, + ARRAY_AGG(metadata.geo.subdivision1 RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS subdivision1, + ARRAY_AGG(normalized_channel RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_channel, + ARRAY_AGG(normalized_country_code RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS country, + ARRAY_AGG(normalized_os RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_os, + ARRAY_AGG(normalized_os_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_os_version, + ARRAY_AGG(payload.processes.parent.scalars.startup_profile_selection_reason RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS startup_profile_selection_reason, + ARRAY_AGG(environment.settings.attribution.dltoken RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_dltoken, + ARRAY_AGG(environment.settings.attribution.dlsource RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_dlsource, + FROM + `moz-fx-data-shared-prod.telemetry.new_profile` + WHERE + {% if is_init() %} + DATE(submission_timestamp) >= '2010-01-01' + {% if parallel_run() %} + AND {mapped_values} + {% endif %} + {% else %} + DATE(submission_timestamp) = @submission_date + {% endif %} + GROUP BY + client_id, + sample_id +), +shutdown_ping AS ( + SELECT + client_id AS client_id, + sample_id AS sample_id, + MIN(submission_timestamp) AS first_seen_timestamp, + ARRAY_AGG(DATE(submission_timestamp) ORDER BY submission_timestamp ASC) AS all_dates, + ARRAY_AGG(application.architecture RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS architecture, + ARRAY_AGG(environment.build.build_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_build_id, + ARRAY_AGG(normalized_app_name RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_name, + ARRAY_AGG(environment.settings.locale RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS locale, + ARRAY_AGG(application.platform_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS platform_version, + ARRAY_AGG(application.vendor RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS vendor, + ARRAY_AGG(application.version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS app_version, + ARRAY_AGG(application.xpcom_abi RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS xpcom_abi, + ARRAY_AGG(document_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS document_id, + ARRAY_AGG(environment.partner.distribution_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS distribution_id, + ARRAY_AGG(environment.partner.distribution_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distribution_version, + ARRAY_AGG(environment.partner.distributor RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distributor, + ARRAY_AGG(environment.partner.distributor_channel RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_distributor_channel, + ARRAY_AGG(environment.partner.partner_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS partner_id, + ARRAY_AGG(environment.settings.attribution.campaign RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_campaign, + ARRAY_AGG(environment.settings.attribution.content RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_content, + ARRAY_AGG(environment.settings.attribution.experiment RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_experiment, + ARRAY_AGG(environment.settings.attribution.medium RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_medium, + ARRAY_AGG(environment.settings.attribution.source RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_source, + ARRAY_AGG(environment.settings.attribution.ua RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_ua, + ARRAY_AGG(environment.settings.default_search_engine_data.load_path RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_load_path, + ARRAY_AGG(environment.settings.default_search_engine_data.name RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_name, + ARRAY_AGG(environment.settings.default_search_engine_data.origin RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_origin, + ARRAY_AGG(environment.settings.default_search_engine_data.submission_url RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS engine_data_submission_url, + ARRAY_AGG(environment.system.apple_model_id RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS apple_model_id, + ARRAY_AGG(metadata.geo.city RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS city, + ARRAY_AGG(metadata.geo.db_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS db_version, + ARRAY_AGG(metadata.geo.subdivision1 RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS subdivision1, + ARRAY_AGG(normalized_channel RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_channel, + ARRAY_AGG(normalized_country_code RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS country, + ARRAY_AGG(normalized_os RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_os, + ARRAY_AGG(normalized_os_version RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS normalized_os_version, + ARRAY_AGG(payload.processes.parent.scalars.startup_profile_selection_reason RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS startup_profile_selection_reason, + ARRAY_AGG(environment.settings.attribution.dltoken RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_dltoken, + ARRAY_AGG(environment.settings.attribution.dlsource RESPECT NULLS ORDER BY submission_timestamp)[SAFE_OFFSET(0)] AS attribution_dlsource, + FROM + `moz-fx-data-shared-prod.telemetry.first_shutdown` + WHERE + {% if is_init() %} + DATE(submission_timestamp) >= '2010-01-01' + {% if parallel_run() %} + AND {mapped_values} + {% endif %} + {% else %} + DATE(submission_timestamp) = @submission_date + {% endif %} + GROUP BY + client_id, + sample_id +), +main_ping AS ( + -- The columns set as NULL are not available in clients_daily_v6 and need to be + -- retrieved in the ETL from telemetry_stable.main_v4:. + SELECT + client_id AS client_id, + sample_id AS sample_id, + -- The submission_timestamp_min is used to compare with the TIMESTAMP of + -- the new_profile and first shutdown pings. + -- It was implemented on Dec 16, 2019 and has data from 2018-10-30. + IF( + MIN(submission_date) >= '2018-10-30', + MIN(submission_timestamp_min), + TIMESTAMP(MIN(submission_date)) + ) AS first_seen_timestamp, + ARRAY_AGG(DATE(submission_date) ORDER BY submission_date ASC) AS all_dates, + CAST(NULL AS STRING) AS architecture, -- main_v4:environment.build.architecture + ARRAY_AGG(env_build_id RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS app_build_id, + ARRAY_AGG(app_name RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS app_name, + ARRAY_AGG(locale RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS locale, + CAST(NULL AS STRING) AS platform_version, -- main_v4:environment.build.platform_version + ARRAY_AGG(vendor RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS vendor, + ARRAY_AGG(app_version RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS app_version, + CAST(NULL AS STRING) AS xpcom_abi, -- main_v4:environment.build.xpcom_abi / application.xpcom_abi + CAST(NULL AS STRING) AS document_id, -- main_v4:document_id + ARRAY_AGG(distribution_id RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS distribution_id, + CAST(NULL AS STRING) AS partner_distribution_version, -- main_v4:environment.partner.distribution_version + CAST(NULL AS STRING) AS partner_distributor, -- main_v4:environment.partner.distributor + CAST(NULL AS STRING) AS partner_distributor_channel, -- main_v4:environment.partner.distributor_channel + CAST(NULL AS STRING) AS partner_id, -- main_v4:environment.partner.distribution_id + ARRAY_AGG(attribution.campaign RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_campaign, + ARRAY_AGG(attribution.content RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_content, + ARRAY_AGG(attribution.experiment RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_experiment, + ARRAY_AGG(attribution.medium RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_medium, + ARRAY_AGG(attribution.source RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_source, + CAST(NULL AS STRING) AS attribution_ua, -- main_v4:environment.settings.attribution.ua + ARRAY_AGG(default_search_engine_data_load_path RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS engine_data_load_path, + ARRAY_AGG(default_search_engine_data_name RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS engine_data_name, + ARRAY_AGG(default_search_engine_data_origin RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS engine_data_origin, + ARRAY_AGG(default_search_engine_data_submission_url RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS engine_data_submission_url, + CAST(NULL AS STRING) AS apple_model_id, -- main_v4:environment.system.apple_model_id + ARRAY_AGG(city RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS city, + CAST(NULL AS STRING) AS db_version, -- main_v4:metadata.geo.db_version + ARRAY_AGG(geo_subdivision1 RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS subdivision1, + ARRAY_AGG(normalized_channel RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS normalized_channel, + ARRAY_AGG(country RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS country, + ARRAY_AGG(os RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS normalized_os, + ARRAY_AGG(normalized_os_version RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS normalized_os_version, + CAST(NULL AS STRING) AS startup_profile_selection_reason, -- main_v4:payload.processes.parent.scalars.startup_profile_selection_reason + ARRAY_AGG(attribution.dltoken RESPECT NULLS ORDER BY submission_date)[SAFE_OFFSET(0)] AS attribution_dltoken, + CAST(NULL AS STRING) AS attribution_dlsource -- main_v4:environment.settings.attribution.dlsource + FROM + `moz-fx-data-shared-prod.telemetry_derived.clients_daily_v6` + WHERE + {% if is_init() %} + submission_date >= '2010-01-01' + {% if parallel_run() %} + AND {mapped_values} + {% endif %} + {% else %} + submission_date = @submission_date + {% endif %} + GROUP BY + client_id, + sample_id +), +unioned AS ( + SELECT + *, + 'new_profile' AS source_ping + FROM + new_profile_ping + UNION ALL + SELECT + *, + 'shutdown' AS source_ping + FROM + shutdown_ping + UNION ALL + SELECT + *, + 'main' AS source_ping + FROM + main_ping +), +-- The next CTE unions all reported dates from all pings. +-- It's required to find the first and second seen dates. +dates_with_reporting_ping AS ( + SELECT + client_id, + ARRAY_CONCAT( + ARRAY_AGG( + STRUCT( + all_dates AS value, + unioned.source_ping AS value_source, + all_dates AS value_date + ) IGNORE NULLS + ) + ) AS seen_dates + FROM + unioned + LEFT JOIN + UNNEST(all_dates) AS all_dates + GROUP BY + client_id +), +-- The next CTE returns the first_seen_date and reporting ping. +-- The timestamp is also required to retrieve the first_seen attributes. +first_seen_date AS ( + SELECT + client_id, + DATE(MIN(first_seen_timestamp)) AS first_seen_date, + MIN(first_seen_timestamp) AS first_seen_timestamp, + ANY_VALUE(source_ping HAVING MIN first_seen_timestamp) AS first_seen_source_ping, + FROM + unioned + GROUP BY + client_id +), +-- The next CTE returns the second_seen_date. It finds the next date after first_seen_date +-- as reported by the main ping. Further dates reported by other pings are skipped. +-- The source ping for second_seen_date is always the main ping. +second_seen_date AS ( + SELECT + client_id, + IF( + ARRAY_LENGTH(ARRAY_AGG(seen_dates)) > 1, + ARRAY_AGG(seen_dates ORDER BY value_date ASC)[SAFE_OFFSET(1)], + NULL + ) AS second_seen_date + FROM + dates_with_reporting_ping + LEFT JOIN + UNNEST(seen_dates) AS seen_dates + WHERE + seen_dates.value_source = 'main' + GROUP BY + client_id +), +-- The next CTE returns the pings ever reported by each client +-- Different from other attributes, this data is updated daily when it's NULL, +-- so it's not limited to the first_seen_date. +reported_pings AS ( + SELECT + client_id, + 'main' IN UNNEST(ARRAY_AGG(source_ping)) AS reported_main_ping, + 'new_profile' IN UNNEST(ARRAY_AGG(source_ping)) AS reported_new_profile_ping, + 'shutdown' IN UNNEST(ARRAY_AGG(source_ping)) AS reported_shutdown_ping + FROM + unioned + GROUP BY + client_id +), +_current AS ( + SELECT + unioned.client_id AS client_id, + unioned.sample_id AS sample_id, + fsd.first_seen_date AS first_seen_date, + ssd.second_seen_date.value AS second_seen_date, + unioned.* EXCEPT (client_id, sample_id, first_seen_timestamp, all_dates, source_ping), + STRUCT( + fsd.first_seen_source_ping AS first_seen_date_source_ping, + pings.reported_main_ping AS reported_main_ping, + pings.reported_new_profile_ping AS reported_new_profile_ping, + pings.reported_shutdown_ping AS reported_shutdown_ping + ) AS metadata + FROM + unioned + INNER JOIN + first_seen_date AS fsd + USING + (client_id, first_seen_timestamp) + LEFT JOIN + second_seen_date AS ssd + USING + (client_id) + LEFT JOIN + reported_pings AS pings + USING + (client_id) +), +_previous AS ( + SELECT + * + FROM + `moz-fx-data-shared-prod.telemetry_derived.clients_first_seen_v2` +) +SELECT + {% if is_init() %} + IF(_previous.client_id IS NULL, _current, _previous).* + {% else %} + -- For the daily update: + -- The reported ping status in the metadata is updated when it's NULL. + -- The second_seen_date is updated when it's NULL and only if there is a + -- main ping reported on the submission_date. + -- Every other attribute remains as reported on the first_seen_date. + IF(_previous.client_id IS NULL, _current, _previous).* REPLACE ( + IF( + _previous.first_seen_date IS NOT NULL + AND _previous.second_seen_date IS NULL + AND _current.client_id IS NOT NULL + AND _current.metadata.reported_main_ping, + @submission_date, + _previous.second_seen_date + ) AS second_seen_date, + ( + SELECT AS STRUCT + IF(_previous.client_id IS NULL, _current, _previous).metadata.* REPLACE ( + IF( + _previous.client_id IS NULL + OR _previous.metadata.reported_main_ping IS FALSE + AND _current.metadata.reported_main_ping IS TRUE, + _current.metadata.reported_main_ping, + _previous.metadata.reported_main_ping + ) AS reported_main_ping, + IF( + _previous.client_id IS NULL + OR _previous.metadata.reported_new_profile_ping IS FALSE + AND _current.metadata.reported_new_profile_ping IS TRUE, + _current.metadata.reported_new_profile_ping, + _previous.metadata.reported_new_profile_ping + ) AS reported_new_profile_ping, + IF( + _previous.client_id IS NULL + OR _previous.metadata.reported_shutdown_ping IS FALSE + AND _current.metadata.reported_shutdown_ping IS TRUE, + _current.metadata.reported_shutdown_ping, + _previous.metadata.reported_shutdown_ping + ) AS reported_shutdown_ping + ) + ) AS metadata + ) + {% endif %} +FROM + _previous +FULL JOIN + _current +USING + (client_id)