diff --git a/bigquery_etl/cli/query.py b/bigquery_etl/cli/query.py index 001e93b739..3e7b0f9796 100644 --- a/bigquery_etl/cli/query.py +++ b/bigquery_etl/cli/query.py @@ -24,7 +24,6 @@ from ..cli.utils import ( is_authenticated, is_valid_project, paths_matching_name_pattern, - parallelism_option, project_id_option, respect_dryrun_skip_option, sql_dir_option, @@ -687,7 +686,7 @@ def backfill( ) @click.argument("name") @sql_dir_option -@project_id_option +@project_id_option() @click.option( "--public_project_id", "--public-project-id", @@ -712,7 +711,6 @@ def backfill( + "If not set, determines destination dataset based on query." ), ) -@parallelism_option @click.pass_context def run( ctx, @@ -723,6 +721,7 @@ def run( destination_table, dataset_id, ): + """Run a query.""" if not is_authenticated(): click.echo( "Authentication to GCP required. Run `gcloud auth login` " @@ -812,7 +811,6 @@ def run( @click.argument( "query_dir", type=click.Path(file_okay=False), - help="Path to the query directory that contains part*.sql files", ) @click.option( "--using", @@ -830,8 +828,8 @@ def run( "--dataset-id", help="Default dataset, if not specified all tables must be qualified with dataset", ) -@project_id_option -@temp_dataset_option +@project_id_option() +@temp_dataset_option() @click.option( "--destination_table", required=True, @@ -849,13 +847,14 @@ def run( ) @click.option( "--dry_run", - action="store_true", + is_flag=True, + default=False, help="Print bytes that would be processed for each part and don't run queries", ) @click.option( "--parameter", "--parameters", - action="append", + multiple=True, default=[], type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)), metavar="NAME:TYPE:VALUE", @@ -863,9 +862,8 @@ def run( ) @click.option( "--priority", - choices=["BATCH", "INTERACTIVE"], default="INTERACTIVE", - type=str.upper, + type=click.Choice(["BATCH", "INTERACTIVE"]), help=( "Priority for BigQuery query jobs; BATCH priority will significantly slow " "down queries if reserved slots are not enabled for the billing project; " @@ -875,16 +873,18 @@ def run( @click.option( "--schema_update_option", "--schema_update_options", - action="append", - choices=[ - bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, - bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, - # Airflow passes an empty string when the field addition date doesn't - # match the run date. - # See https://github.com/mozilla/telemetry-airflow/blob/ - # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 - "", - ], + multiple=True, + type=click.Choice( + [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, + # Airflow passes an empty string when the field addition date doesn't + # match the run date. + # See https://github.com/mozilla/telemetry-airflow/blob/ + # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 + "", + ] + ), default=[], help="Optional options for updating the schema.", ) @@ -903,6 +903,7 @@ def run_multipart( priority, schema_update_options, ): + """Run a multipart query.""" if dataset_id is not None and "." not in dataset_id and project_id is not None: dataset_id = f"{project_id}.{dataset_id}" if "." not in destination_table and dataset_id is not None: @@ -963,7 +964,7 @@ def run_multipart( def _run_part( client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority ): - """Runs a query part.""" + """Run a query part.""" with open(os.path.join(query_dir, part)) as sql_file: query = sql_file.read() job_config = bigquery.QueryJobConfig( diff --git a/bigquery_etl/cli/utils.py b/bigquery_etl/cli/utils.py index fdb5f89b38..0c7a32aeb4 100644 --- a/bigquery_etl/cli/utils.py +++ b/bigquery_etl/cli/utils.py @@ -15,6 +15,7 @@ QUERY_FILE_RE = re.compile( r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/" r"(?:query\.sql|part1\.sql|script\.sql|query\.py|view\.sql)$" ) +TEST_PROJECT = "bigquery-etl-integration-test" def is_valid_dir(ctx, param, value): @@ -43,7 +44,9 @@ def is_authenticated(project_id=None): def is_valid_project(ctx, param, value): """Check if the provided project_id corresponds to an existing project.""" - if value is None or value in [Path(p).name for p in project_dirs()]: + if value is None or value in [Path(p).name for p in project_dirs()] + [ + TEST_PROJECT + ]: return value raise click.BadParameter(f"Invalid project {value}") @@ -66,8 +69,7 @@ def paths_matching_name_pattern(pattern, sql_path, project_id, files=("*.sql")): for file in files: matching_files.extend(Path(root).rglob(file)) elif os.path.isfile(pattern): - for file in files: - matching_files.extend(Path(sql_path).rglob(file)) + matching_files.append(pattern) else: sql_path = Path(sql_path) if project_id is not None: diff --git a/bigquery_etl/run_multipart_query.py b/bigquery_etl/run_multipart_query.py deleted file mode 100644 index b9fc60dc54..0000000000 --- a/bigquery_etl/run_multipart_query.py +++ /dev/null @@ -1,191 +0,0 @@ -#!/usr/bin/env python3 - -""" -Writes multiple queries to temporary tables and then joins the results. - -This is useful for queries that BigQuery deems too complex to run, usually due -to using a large number of subqueries; this pattern allows you to materialize -subsets of columns in multiple different queries so that they stay under the -complexity limit, and then join those results to generate a final wide result. - -The query files must be in the same directory and all be prefixed with `part`. -""" - -import os.path -from argparse import ArgumentParser -from multiprocessing.pool import ThreadPool - -from google.cloud import bigquery - -from .util import standard_args -from .util.bigquery_id import sql_table_id - - -def dirpath(string): - """Path to a dir that must exist.""" - if not os.path.isdir(string): - raise ValueError() - return string - - -parser = ArgumentParser(description=__doc__) -parser.add_argument( - "--using", - default="document_id", - help="comma separated list of join columns to use when combining results", -) -parser.add_argument( - "--parallelism", - default=4, - type=int, - help="Maximum number of queries to execute concurrently", -) -parser.add_argument( - "--dataset_id", - help="Default dataset, if not specified all tables must be qualified with dataset", -) -parser.add_argument( - "--project_id", help="Default project, if not specified the sdk will determine one" -) -standard_args.add_temp_dataset(parser) -parser.add_argument( - "--destination_table", - required=True, - help="table where combined results will be written", -) -parser.add_argument( - "--time_partitioning_field", - type=lambda f: bigquery.TimePartitioning(field=f), - help="time partition field on the destination table", -) -parser.add_argument( - "--clustering_fields", - type=lambda f: f.split(","), - help="comma separated list of clustering fields on the destination table", -) -parser.add_argument( - "--dry_run", - action="store_true", - help="Print bytes that would be processed for each part and don't run queries", -) -parser.add_argument( - "--parameter", - action="append", - default=[], - dest="parameters", - type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)), - metavar="NAME:TYPE:VALUE", - help="query parameter(s) to pass when running parts", -) -parser.add_argument( - "--priority", - choices=["BATCH", "INTERACTIVE"], - default="INTERACTIVE", - type=str.upper, - help=( - "Priority for BigQuery query jobs; BATCH priority will significantly slow " - "down queries if reserved slots are not enabled for the billing project; " - "defaults to INTERACTIVE" - ), -) -parser.add_argument( - "query_dir", - type=dirpath, - help="Path to the query directory that contains part*.sql files", -) -parser.add_argument( - "--schema_update_option", - action="append", - choices=[ - bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, - bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION, - # Airflow passes an empty string when the field addition date doesn't - # match the run date. - # See https://github.com/mozilla/telemetry-airflow/blob/ - # e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515 - "", - ], - default=[], - dest="schema_update_options", - help="Optional options for updating the schema.", -) - - -def _run_part(client, part, args): - with open(os.path.join(args.query_dir, part)) as sql_file: - query = sql_file.read() - job_config = bigquery.QueryJobConfig( - destination=args.temp_dataset.temp_table(), - default_dataset=args.dataset_id, - use_legacy_sql=False, - dry_run=args.dry_run, - query_parameters=args.parameters, - priority=args.priority, - allow_large_results=True, - ) - job = client.query(query=query, job_config=job_config) - if job.dry_run: - print(f"Would process {job.total_bytes_processed:,d} bytes for {part}") - else: - job.result() - print(f"Processed {job.total_bytes_processed:,d} bytes for {part}") - return part, job - - -def main(): - """Run multipart query.""" - args = parser.parse_args() - if ( - args.dataset_id is not None - and "." not in args.dataset_id - and args.project_id is not None - ): - args.dataset_id = f"{args.project_id}.{args.dataset_id}" - if "." not in args.destination_table and args.dataset_id is not None: - args.destination_table = f"{args.dataset_id}.{args.destination_table}" - client = bigquery.Client(args.project_id) - with ThreadPool(args.parallelism) as pool: - parts = pool.starmap( - _run_part, - [ - (client, part, args) - for part in sorted(next(os.walk(args.query_dir))[2]) - if part.startswith("part") and part.endswith(".sql") - ], - chunksize=1, - ) - if not args.dry_run: - total_bytes = sum(job.total_bytes_processed for _, job in parts) - query = ( - f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`" - + "".join( - f"\nFULL JOIN\n `{sql_table_id(job.destination)}`" - f"\nUSING\n ({args.using})" - for _, job in parts[1:] - ) - ) - try: - job = client.query( - query=query, - job_config=bigquery.QueryJobConfig( - destination=args.destination_table, - time_partitioning=args.time_partitioning_field, - clustering_fields=args.clustering_fields, - write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, - use_legacy_sql=False, - priority=args.priority, - schema_update_options=args.schema_update_options, - ), - ) - job.result() - print(f"Processed {job.total_bytes_processed:,d} bytes to combine results") - total_bytes += job.total_bytes_processed - print(f"Processed {total_bytes:,d} bytes in total") - finally: - for _, job in parts: - client.delete_table(sql_table_id(job.destination).split("$")[0]) - print(f"Deleted {len(parts)} temporary tables") - - -if __name__ == "__main__": - main() diff --git a/bigquery_etl/run_query.py b/bigquery_etl/run_query.py deleted file mode 100644 index 65e667be4c..0000000000 --- a/bigquery_etl/run_query.py +++ /dev/null @@ -1,107 +0,0 @@ -""" -Runs SQL queries and writes results to destination tables. - -When executing a query associated metadata is parsed to determine whether -results should be written to a corresponding public dataset. -""" - -import re -import subprocess -import sys -from argparse import ArgumentParser - -import yaml - -from bigquery_etl.metadata.parse_metadata import Metadata -from bigquery_etl.metadata.validate_metadata import validate_public_data - -DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$") -PUBLIC_PROJECT_ID = "mozilla-public-data" - - -parser = ArgumentParser(description=__doc__) -parser.add_argument( - "--public_project_id", - default=PUBLIC_PROJECT_ID, - help="Project with publicly accessible data", -) -parser.add_argument( - "--destination_table", help="Destination table name results are written to" -) -parser.add_argument("--dataset_id", help="Destination dataset") -parser.add_argument("--query_file", help="File path to query to be executed") - - -def run( - query_file, - dataset_id, - destination_table, - query_arguments, - public_project_id=PUBLIC_PROJECT_ID, -): - """Execute bq to run a query.""" - if dataset_id is not None: - # dataset ID was parsed by argparse but needs to be passed as parameter - # when running the query - query_arguments.append("--dataset_id={}".format(dataset_id)) - - use_public_table = False - - try: - metadata = Metadata.of_query_file(query_file) - if metadata.is_public_bigquery(): - if not validate_public_data(metadata, query_file): - sys.exit(1) - - # change the destination table to write results to the public dataset; - # a view to the public table in the internal dataset is created - # when CI runs - if ( - dataset_id is not None - and destination_table is not None - and re.match(DESTINATION_TABLE_RE, destination_table) - ): - destination_table = "{}:{}.{}".format( - public_project_id, dataset_id, destination_table - ) - query_arguments.append( - "--destination_table={}".format(destination_table) - ) - use_public_table = True - else: - print( - "ERROR: Cannot run public dataset query. Parameters" - " --destination_table=