This commit is contained in:
Anna Scholtz 2022-07-22 12:04:48 -07:00
Родитель b5da28a02c
Коммит a54e3f7309
10 изменённых файлов: 56 добавлений и 353 удалений

Просмотреть файл

@ -24,7 +24,6 @@ from ..cli.utils import (
is_authenticated,
is_valid_project,
paths_matching_name_pattern,
parallelism_option,
project_id_option,
respect_dryrun_skip_option,
sql_dir_option,
@ -687,7 +686,7 @@ def backfill(
)
@click.argument("name")
@sql_dir_option
@project_id_option
@project_id_option()
@click.option(
"--public_project_id",
"--public-project-id",
@ -712,7 +711,6 @@ def backfill(
+ "If not set, determines destination dataset based on query."
),
)
@parallelism_option
@click.pass_context
def run(
ctx,
@ -723,6 +721,7 @@ def run(
destination_table,
dataset_id,
):
"""Run a query."""
if not is_authenticated():
click.echo(
"Authentication to GCP required. Run `gcloud auth login` "
@ -812,7 +811,6 @@ def run(
@click.argument(
"query_dir",
type=click.Path(file_okay=False),
help="Path to the query directory that contains part*.sql files",
)
@click.option(
"--using",
@ -830,8 +828,8 @@ def run(
"--dataset-id",
help="Default dataset, if not specified all tables must be qualified with dataset",
)
@project_id_option
@temp_dataset_option
@project_id_option()
@temp_dataset_option()
@click.option(
"--destination_table",
required=True,
@ -849,13 +847,14 @@ def run(
)
@click.option(
"--dry_run",
action="store_true",
is_flag=True,
default=False,
help="Print bytes that would be processed for each part and don't run queries",
)
@click.option(
"--parameter",
"--parameters",
action="append",
multiple=True,
default=[],
type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)),
metavar="NAME:TYPE:VALUE",
@ -863,9 +862,8 @@ def run(
)
@click.option(
"--priority",
choices=["BATCH", "INTERACTIVE"],
default="INTERACTIVE",
type=str.upper,
type=click.Choice(["BATCH", "INTERACTIVE"]),
help=(
"Priority for BigQuery query jobs; BATCH priority will significantly slow "
"down queries if reserved slots are not enabled for the billing project; "
@ -875,16 +873,18 @@ def run(
@click.option(
"--schema_update_option",
"--schema_update_options",
action="append",
choices=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
# Airflow passes an empty string when the field addition date doesn't
# match the run date.
# See https://github.com/mozilla/telemetry-airflow/blob/
# e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515
"",
],
multiple=True,
type=click.Choice(
[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
# Airflow passes an empty string when the field addition date doesn't
# match the run date.
# See https://github.com/mozilla/telemetry-airflow/blob/
# e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515
"",
]
),
default=[],
help="Optional options for updating the schema.",
)
@ -903,6 +903,7 @@ def run_multipart(
priority,
schema_update_options,
):
"""Run a multipart query."""
if dataset_id is not None and "." not in dataset_id and project_id is not None:
dataset_id = f"{project_id}.{dataset_id}"
if "." not in destination_table and dataset_id is not None:
@ -963,7 +964,7 @@ def run_multipart(
def _run_part(
client, part, query_dir, temp_dataset, dataset_id, dry_run, parameters, priority
):
"""Runs a query part."""
"""Run a query part."""
with open(os.path.join(query_dir, part)) as sql_file:
query = sql_file.read()
job_config = bigquery.QueryJobConfig(

Просмотреть файл

@ -15,6 +15,7 @@ QUERY_FILE_RE = re.compile(
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
r"(?:query\.sql|part1\.sql|script\.sql|query\.py|view\.sql)$"
)
TEST_PROJECT = "bigquery-etl-integration-test"
def is_valid_dir(ctx, param, value):
@ -43,7 +44,9 @@ def is_authenticated(project_id=None):
def is_valid_project(ctx, param, value):
"""Check if the provided project_id corresponds to an existing project."""
if value is None or value in [Path(p).name for p in project_dirs()]:
if value is None or value in [Path(p).name for p in project_dirs()] + [
TEST_PROJECT
]:
return value
raise click.BadParameter(f"Invalid project {value}")
@ -66,8 +69,7 @@ def paths_matching_name_pattern(pattern, sql_path, project_id, files=("*.sql")):
for file in files:
matching_files.extend(Path(root).rglob(file))
elif os.path.isfile(pattern):
for file in files:
matching_files.extend(Path(sql_path).rglob(file))
matching_files.append(pattern)
else:
sql_path = Path(sql_path)
if project_id is not None:

Просмотреть файл

@ -1,191 +0,0 @@
#!/usr/bin/env python3
"""
Writes multiple queries to temporary tables and then joins the results.
This is useful for queries that BigQuery deems too complex to run, usually due
to using a large number of subqueries; this pattern allows you to materialize
subsets of columns in multiple different queries so that they stay under the
complexity limit, and then join those results to generate a final wide result.
The query files must be in the same directory and all be prefixed with `part`.
"""
import os.path
from argparse import ArgumentParser
from multiprocessing.pool import ThreadPool
from google.cloud import bigquery
from .util import standard_args
from .util.bigquery_id import sql_table_id
def dirpath(string):
"""Path to a dir that must exist."""
if not os.path.isdir(string):
raise ValueError()
return string
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--using",
default="document_id",
help="comma separated list of join columns to use when combining results",
)
parser.add_argument(
"--parallelism",
default=4,
type=int,
help="Maximum number of queries to execute concurrently",
)
parser.add_argument(
"--dataset_id",
help="Default dataset, if not specified all tables must be qualified with dataset",
)
parser.add_argument(
"--project_id", help="Default project, if not specified the sdk will determine one"
)
standard_args.add_temp_dataset(parser)
parser.add_argument(
"--destination_table",
required=True,
help="table where combined results will be written",
)
parser.add_argument(
"--time_partitioning_field",
type=lambda f: bigquery.TimePartitioning(field=f),
help="time partition field on the destination table",
)
parser.add_argument(
"--clustering_fields",
type=lambda f: f.split(","),
help="comma separated list of clustering fields on the destination table",
)
parser.add_argument(
"--dry_run",
action="store_true",
help="Print bytes that would be processed for each part and don't run queries",
)
parser.add_argument(
"--parameter",
action="append",
default=[],
dest="parameters",
type=lambda p: bigquery.ScalarQueryParameter(*p.split(":", 2)),
metavar="NAME:TYPE:VALUE",
help="query parameter(s) to pass when running parts",
)
parser.add_argument(
"--priority",
choices=["BATCH", "INTERACTIVE"],
default="INTERACTIVE",
type=str.upper,
help=(
"Priority for BigQuery query jobs; BATCH priority will significantly slow "
"down queries if reserved slots are not enabled for the billing project; "
"defaults to INTERACTIVE"
),
)
parser.add_argument(
"query_dir",
type=dirpath,
help="Path to the query directory that contains part*.sql files",
)
parser.add_argument(
"--schema_update_option",
action="append",
choices=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
# Airflow passes an empty string when the field addition date doesn't
# match the run date.
# See https://github.com/mozilla/telemetry-airflow/blob/
# e49fa7e6b3f5ec562dd248d257770c2303cf0cba/dags/utils/gcp.py#L515
"",
],
default=[],
dest="schema_update_options",
help="Optional options for updating the schema.",
)
def _run_part(client, part, args):
with open(os.path.join(args.query_dir, part)) as sql_file:
query = sql_file.read()
job_config = bigquery.QueryJobConfig(
destination=args.temp_dataset.temp_table(),
default_dataset=args.dataset_id,
use_legacy_sql=False,
dry_run=args.dry_run,
query_parameters=args.parameters,
priority=args.priority,
allow_large_results=True,
)
job = client.query(query=query, job_config=job_config)
if job.dry_run:
print(f"Would process {job.total_bytes_processed:,d} bytes for {part}")
else:
job.result()
print(f"Processed {job.total_bytes_processed:,d} bytes for {part}")
return part, job
def main():
"""Run multipart query."""
args = parser.parse_args()
if (
args.dataset_id is not None
and "." not in args.dataset_id
and args.project_id is not None
):
args.dataset_id = f"{args.project_id}.{args.dataset_id}"
if "." not in args.destination_table and args.dataset_id is not None:
args.destination_table = f"{args.dataset_id}.{args.destination_table}"
client = bigquery.Client(args.project_id)
with ThreadPool(args.parallelism) as pool:
parts = pool.starmap(
_run_part,
[
(client, part, args)
for part in sorted(next(os.walk(args.query_dir))[2])
if part.startswith("part") and part.endswith(".sql")
],
chunksize=1,
)
if not args.dry_run:
total_bytes = sum(job.total_bytes_processed for _, job in parts)
query = (
f"SELECT\n *\nFROM\n `{sql_table_id(parts[0][1].destination)}`"
+ "".join(
f"\nFULL JOIN\n `{sql_table_id(job.destination)}`"
f"\nUSING\n ({args.using})"
for _, job in parts[1:]
)
)
try:
job = client.query(
query=query,
job_config=bigquery.QueryJobConfig(
destination=args.destination_table,
time_partitioning=args.time_partitioning_field,
clustering_fields=args.clustering_fields,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
use_legacy_sql=False,
priority=args.priority,
schema_update_options=args.schema_update_options,
),
)
job.result()
print(f"Processed {job.total_bytes_processed:,d} bytes to combine results")
total_bytes += job.total_bytes_processed
print(f"Processed {total_bytes:,d} bytes in total")
finally:
for _, job in parts:
client.delete_table(sql_table_id(job.destination).split("$")[0])
print(f"Deleted {len(parts)} temporary tables")
if __name__ == "__main__":
main()

Просмотреть файл

@ -1,107 +0,0 @@
"""
Runs SQL queries and writes results to destination tables.
When executing a query associated metadata is parsed to determine whether
results should be written to a corresponding public dataset.
"""
import re
import subprocess
import sys
from argparse import ArgumentParser
import yaml
from bigquery_etl.metadata.parse_metadata import Metadata
from bigquery_etl.metadata.validate_metadata import validate_public_data
DESTINATION_TABLE_RE = re.compile(r"^[a-zA-Z0-9_$]{0,1024}$")
PUBLIC_PROJECT_ID = "mozilla-public-data"
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"--public_project_id",
default=PUBLIC_PROJECT_ID,
help="Project with publicly accessible data",
)
parser.add_argument(
"--destination_table", help="Destination table name results are written to"
)
parser.add_argument("--dataset_id", help="Destination dataset")
parser.add_argument("--query_file", help="File path to query to be executed")
def run(
query_file,
dataset_id,
destination_table,
query_arguments,
public_project_id=PUBLIC_PROJECT_ID,
):
"""Execute bq to run a query."""
if dataset_id is not None:
# dataset ID was parsed by argparse but needs to be passed as parameter
# when running the query
query_arguments.append("--dataset_id={}".format(dataset_id))
use_public_table = False
try:
metadata = Metadata.of_query_file(query_file)
if metadata.is_public_bigquery():
if not validate_public_data(metadata, query_file):
sys.exit(1)
# change the destination table to write results to the public dataset;
# a view to the public table in the internal dataset is created
# when CI runs
if (
dataset_id is not None
and destination_table is not None
and re.match(DESTINATION_TABLE_RE, destination_table)
):
destination_table = "{}:{}.{}".format(
public_project_id, dataset_id, destination_table
)
query_arguments.append(
"--destination_table={}".format(destination_table)
)
use_public_table = True
else:
print(
"ERROR: Cannot run public dataset query. Parameters"
" --destination_table=<table without dataset ID> and"
" --dataset_id=<dataset> required"
)
sys.exit(1)
except yaml.YAMLError as e:
print(e)
sys.exit(1)
except FileNotFoundError:
print("INFO: No metadata.yaml found for {}", query_file)
if not use_public_table and destination_table is not None:
# destination table was parsed by argparse, however if it wasn't modified to
# point to a public table it needs to be passed as parameter for the query
query_arguments.append("--destination_table={}".format(destination_table))
with open(query_file) as query_stream:
# run the query as shell command so that passed parameters can be used as is
subprocess.check_call(["bq"] + query_arguments, stdin=query_stream)
def main():
"""Run query."""
args, query_arguments = parser.parse_known_args()
run(
args.query_file,
args.dataset_id,
args.destination_table,
query_arguments,
args.public_project_id,
)
if __name__ == "__main__":
main()

Просмотреть файл

@ -4,7 +4,6 @@ import os
import random
import re
import string
import logging
import warnings
from typing import List
from pathlib import Path

Просмотреть файл

@ -31,7 +31,7 @@ elif [ "$1" = "query" ]; then
# query [options] FILE
# we dispatch to a script that inspects metadata and emulates the following call:
# bq query [options] < FILE
exec script/bqetl query run --query_file="${@: -1}" "${@:1:$#-1}"
exec script/bqetl query run "${@: -1}" "${@:1:$#-1}"
elif [ "$XCOM_PUSH" = "true" ]; then
# KubernetesPodOperator will extract the contents of /airflow/xcom/return.json as an xcom
# if the xcom_push parameter is true

Просмотреть файл

@ -1,12 +0,0 @@
#!/bin/sh
# Writes multiple queries to temporary tables and then joins the results.
# This is useful for queries that BigQuery deems too complex to run, usually due
# to using a large number of subqueries; this pattern allows you to materialize
# subsets of columns in multiple different queries so that they stay under the
# complexity limit, and then join those results to generate a final wide result.
# The query files must be in the same directory and all be prefixed with `part`.
exec python3 -m bigquery_etl.run_multipart_query "$@"

Просмотреть файл

@ -1,5 +0,0 @@
#!/bin/sh
# Run a SQL query.
exec python3 -m bigquery_etl.run_query "$@"

Просмотреть файл

@ -11,7 +11,7 @@ ENTRYPOINT_SCRIPT = Path(__file__).parent.parent / "script" / "entrypoint"
class TestEntrypoint:
@pytest.mark.integration
def test_run_query(self, tmp_path, project_id):
query_file_path = tmp_path / "sql" / "test" / "query_v1"
query_file_path = tmp_path / "sql" / project_id / "query_v1"
os.makedirs(query_file_path)
query_file = query_file_path / "query.sql"
@ -43,7 +43,7 @@ class TestEntrypoint:
def test_run_query_write_to_table(
self, tmp_path, bigquery_client, project_id, temporary_dataset
):
query_file_path = tmp_path / "sql" / "test" / "query_v1"
query_file_path = tmp_path / "sql" / project_id / "query_v1"
os.makedirs(query_file_path)
query_file = query_file_path / "query.sql"
@ -87,5 +87,5 @@ class TestEntrypoint:
@pytest.mark.integration
def test_run_query_no_query_file(self):
with pytest.raises(subprocess.CalledProcessError):
subprocess.check_call([ENTRYPOINT_SCRIPT, "query"])
result = subprocess.check_output([ENTRYPOINT_SCRIPT, "query"])
assert b"No files matching:" in result

Просмотреть файл

@ -1,10 +1,11 @@
import os
from unittest.mock import patch
import pytest
from click.testing import CliRunner
import yaml
from bigquery_etl.run_query import run
from bigquery_etl.cli.query import run
class TestRunQuery:
@ -23,9 +24,15 @@ class TestRunQuery:
metadata_file = query_file_path / "metadata.yaml"
metadata_file.write_text(yaml.dump(metadata_conf))
runner = CliRunner()
with patch("subprocess.check_call") as mock_call:
mock_call.return_value = True
run(query_file, "test", "query_v1", [])
result = runner.invoke(
run,
[str(query_file), "--dataset_id=test", "--destination_table=query_v1"],
)
assert result.exit_code == 0
assert mock_call.call_args.args == (
["bq", "--dataset_id=test", "--destination_table=query_v1"],
@ -50,7 +57,13 @@ class TestRunQuery:
with patch("subprocess.check_call") as mock_call:
mock_call.return_value = True
run(query_file, "test", "query_v1", [])
runner = CliRunner()
result = runner.invoke(
run,
[str(query_file), "--dataset_id=test", "--destination_table=query_v1"],
)
assert result.exit_code == 0
assert mock_call.call_args.args == (
[
@ -76,11 +89,14 @@ class TestRunQuery:
metadata_file = query_file_path / "metadata.yaml"
metadata_file.write_text(yaml.dump(metadata_conf))
runner = CliRunner()
with patch("subprocess.check_call") as mock_call:
mock_call.return_value = True
with pytest.raises(SystemExit):
run(query_file, None, "query_v1", [])
result = runner.invoke(
run, [str(query_file), "--destination_table=query_v1"]
)
result.exit_code == 1
with pytest.raises(SystemExit):
run(query_file, "test", None, [])
result = runner.invoke(run, [str(query_file), "--dataset_id=test"])
result.exit_code == 1