This commit is contained in:
Anna Scholtz 2020-07-03 10:59:41 -07:00
Родитель 94768fc14e
Коммит dfd52b5647
3 изменённых файлов: 195 добавлений и 172 удалений

Просмотреть файл

@ -2,8 +2,11 @@
from argparse import ArgumentParser
import os
from pathlib import Path
import re
import tempfile
from bigquery_etl.dryrun import dry_run_sql_file
from bigquery_etl.parse_udf import read_udf_dirs, persistent_udf_as_temp
from bigquery_etl.util import standard_args
@ -24,12 +27,33 @@ parser.add_argument(
standard_args.add_log_level(parser)
def udfs_as_temp_functions(dir):
"""Return a single SQL string with all UDFs as temporary functions."""
if os.path.isdir(dir):
for root, dirs, files in os.walk(dir):
if UDF_FILE in files:
pass
def sql_for_dry_run(file, parsed_udfs, project_dir):
"""
Return the example SQL used for the dry run.
Injects all UDFs the example depends on as temporary functions.
"""
dry_run_sql = ""
with open(file) as sql:
example_sql = sql.read()
# add UDFs that example depends on as temporary functions
for udf, raw_udf in parsed_udfs.items():
if udf in example_sql:
query = "".join(raw_udf.definitions)
dry_run_sql += persistent_udf_as_temp(query, parsed_udfs)
dry_run_sql += example_sql
for udf, _ in parsed_udfs.items():
# temporary UDFs cannot contain dots, rename UDFS
dry_run_sql = dry_run_sql.replace(udf, udf.replace(".", "_"))
# remove explicit project references
dry_run_sql = dry_run_sql.replace(project_dir + ".", "")
return dry_run_sql
def main():
@ -38,32 +62,23 @@ def main():
# parse UDFs
parsed_udfs = read_udf_dirs(*args.project_dirs)
for project_dir in args.project_dirs:
if os.path.isdir(project_dir):
for root, dirs, files in os.walk(project_dir):
if os.path.basename(root) == EXAMPLE_DIR:
for file in files:
with open(os.path.join(root, file)) as sql:
dry_run_sql = ""
example_sql = sql.read()
dry_run_sql = sql_for_dry_run(
os.path.join(root, file), parsed_udfs, project_dir
)
for udf, raw_udf in parsed_udfs.items():
if udf in example_sql:
query = "".join(raw_udf.definitions)
dry_run_sql += persistent_udf_as_temp(query, parsed_udfs)
# store sql in temporary file for dry_run
tmp_dir = Path(tempfile.mkdtemp()) / Path(root)
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_example_file = tmp_dir / file
tmp_example_file.write_text(dry_run_sql)
dry_run_sql += example_sql
for udf, _ in parsed_udfs.items():
# temporary UDFs cannot contain dots
dry_run_sql = dry_run_sql.replace(udf, udf.replace(".", "_"))
# remove explicit project references
dry_run_sql = dry_run_sql.replace(project_dir + ".", "")
print(dry_run_sql)
dry_run_sql_file(str(tmp_example_file))
if __name__ == "__main__":

150
bigquery_etl/dryrun.py Normal file
Просмотреть файл

@ -0,0 +1,150 @@
"""
Passes all queries defined under sql/ to a Cloud Function that will run the
queries with the dry_run option enabled.
We could provision BigQuery credentials to the CircleCI job to allow it to run
the queries directly, but there is no way to restrict permissions such that
only dry runs can be performed. In order to reduce risk of CI or local users
accidentally running queries during tests and overwriting production data, we
proxy the queries through the dry run service endpoint.
"""
from multiprocessing.pool import ThreadPool
from os.path import basename, dirname
from urllib.request import urlopen, Request
import glob
import json
import sys
DRY_RUN_URL = (
"https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun"
)
SKIP = {
# Access Denied
"sql/activity_stream/impression_stats_flat/view.sql",
"sql/activity_stream/tile_id_types/view.sql",
"sql/monitoring/document_sample_nonprod_v1/query.sql",
"sql/monitoring/schema_error_counts_v1/view.sql",
"sql/monitoring/structured_error_counts_v1/view.sql",
"sql/pocket/pocket_reach_mau/view.sql",
"sql/telemetry/buildhub2/view.sql",
"sql/firefox_accounts_derived/fxa_content_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_auth_bounce_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_auth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_oauth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_log_auth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_log_content_events_v1/query.sql",
"sql/telemetry_derived/addons_daily_v1/query.sql",
"sql/search_derived/search_clients_last_seen_v1/init.sql",
"sql/search_derived/search_clients_last_seen_v1/query.sql",
"sql/search/search_rfm/view.sql",
"sql/search/search_clients_last_seen_v1/view.sql",
"sql/search/search_clients_last_seen/view.sql",
"sql/firefox_accounts/fxa_amplitude_email_clicks/view.sql",
"sql/firefox_accounts_derived/fxa_amplitude_export_v1/query.sql",
"sql/firefox_accounts_derived/fxa_amplitude_user_ids_v1/query.sql",
"sql/firefox_accounts_derived/fxa_amplitude_user_ids_v1/init.sql",
"sql/revenue_derived/client_ltv_v1/query.sql",
"sql/shredder_state/progress/view.sql",
# Already exists (and lacks an "OR REPLACE" clause)
"sql/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql",
"sql/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql",
"sql/org_mozilla_fenix_derived/clients_last_seen_v1/init.sql",
"sql/org_mozilla_vrbrowser_derived/clients_last_seen_v1/init.sql",
"sql/telemetry_derived/core_clients_last_seen_v1/init.sql",
"sql/telemetry/fxa_users_last_seen_raw_v1/init.sql",
"sql/telemetry_derived/core_clients_first_seen_v1/init.sql",
"sql/telemetry_derived/fxa_users_services_last_seen_v1/init.sql",
"sql/messaging_system_derived/cfr_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/onboarding_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/snippets_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/whats_new_panel_users_last_seen_v1/init.sql",
# Reference table not found
"sql/monitoring/structured_detailed_error_counts_v1/view.sql",
"sql/org_mozilla_firefox_derived/migrated_clients_v1/query.sql",
"sql/org_mozilla_firefox_derived/incline_executive_v1/query.sql",
"sql/org_mozilla_firefox/migrated_clients/view.sql",
# No matching signature for function IF
"sql/static/fxa_amplitude_export_users_last_seen/query.sql",
# Duplicate UDF
"sql/static/fxa_amplitude_export_users_daily/query.sql",
# Syntax error
"sql/telemetry_derived/clients_last_seen_v1/init.sql",
# HTTP Error 408: Request Time-out
"sql/telemetry_derived/latest_versions/query.sql",
"sql/telemetry_derived/italy_covid19_outage_v1/query.sql",
# Query parameter not found
"sql/telemetry_derived/experiments_v1/query.sql",
"sql/telemetry_derived/clients_daily_scalar_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_scalar_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_boolean_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_histogram_bucket_counts_v1/query.sql",
"sql/telemetry_derived/glam_client_probe_counts_extract_v1/query.sql",
"sql/telemetry_derived/asn_aggregates_v1/query.sql",
# Dataset moz-fx-data-shared-prod:glam_etl was not found
*glob.glob("sql/glam_etl/**/*.sql", recursive=True),
# Query templates
"sql/search_derived/mobile_search_clients_daily_v1/fenix_metrics.template.sql",
"sql/search_derived/mobile_search_clients_daily_v1/mobile_search_clients_daily.template.sql", # noqa
}
def dry_run_sql_file(sqlfile):
"""Dry run the provided SQL file."""
sql = open(sqlfile).read()
try:
r = urlopen(
Request(
DRY_RUN_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(
{"dataset": basename(dirname(dirname(sqlfile))), "query": sql}
).encode("utf8"),
method="POST",
)
)
except Exception as e:
print(f"{sqlfile:59} ERROR\n", e)
return False
response = json.load(r)
if "errors" in response and len(response["errors"]) == 1:
error = response["errors"][0]
else:
error = None
if response["valid"]:
print(f"{sqlfile:59} OK")
elif (
error
and error.get("code", None) in [400, 403]
and "does not have bigquery.tables.create permission for dataset"
in error.get("message", "")
):
# We want the dryrun service to only have read permissions, so
# we expect CREATE VIEW and CREATE TABLE to throw specific
# exceptions.
print(f"{sqlfile:59} OK")
else:
print(f"{sqlfile:59} ERROR\n", response["errors"])
return False
return True
def main():
"""Dry run all SQL files in the sql/ directory."""
sql_files = [f for f in glob.glob("sql/**/*.sql", recursive=True) if f not in SKIP]
with ThreadPool(8) as p:
result = p.map(dry_run_sql_file, sql_files, chunksize=1)
if all(result):
exitcode = 0
else:
exitcode = 1
sys.exit(exitcode)
if __name__ == "__main__":
main()

Просмотреть файл

@ -1,150 +1,8 @@
#!/usr/bin/env python3
#!/bin/sh
"""
Passes all queries defined under sql/ to a Cloud Function that will run the
queries with the dry_run option enabled.
# Passes all queries defined under sql/ to a Cloud Function that will run the
# queries with the dry_run option enabled.
We could provision BigQuery credentials to the CircleCI job to allow it to run
the queries directly, but there is no way to restrict permissions such that
only dry runs can be performed. In order to reduce risk of CI or local users
accidentally running queries during tests and overwriting production data, we
proxy the queries through the dry run service endpoint.
"""
cd "$(dirname "$0")/.."
from multiprocessing.pool import ThreadPool
from os.path import basename, dirname
from urllib.request import urlopen, Request
import glob
import json
import sys
DRY_RUN_URL = (
"https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun"
)
SKIP = {
# Access Denied
"sql/activity_stream/impression_stats_flat/view.sql",
"sql/activity_stream/tile_id_types/view.sql",
"sql/monitoring/document_sample_nonprod_v1/query.sql",
"sql/monitoring/schema_error_counts_v1/view.sql",
"sql/monitoring/structured_error_counts_v1/view.sql",
"sql/pocket/pocket_reach_mau/view.sql",
"sql/telemetry/buildhub2/view.sql",
"sql/firefox_accounts_derived/fxa_content_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_auth_bounce_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_auth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_oauth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_log_auth_events_v1/query.sql",
"sql/firefox_accounts_derived/fxa_log_content_events_v1/query.sql",
"sql/telemetry_derived/addons_daily_v1/query.sql",
"sql/search_derived/search_clients_last_seen_v1/init.sql",
"sql/search_derived/search_clients_last_seen_v1/query.sql",
"sql/search/search_rfm/view.sql",
"sql/search/search_clients_last_seen_v1/view.sql",
"sql/search/search_clients_last_seen/view.sql",
"sql/firefox_accounts/fxa_amplitude_email_clicks/view.sql",
"sql/firefox_accounts_derived/fxa_amplitude_export_v1/query.sql",
"sql/firefox_accounts_derived/fxa_amplitude_user_ids_v1/query.sql",
"sql/firefox_accounts_derived/fxa_amplitude_user_ids_v1/init.sql",
"sql/revenue_derived/client_ltv_v1/query.sql",
"sql/shredder_state/progress/view.sql",
# Already exists (and lacks an "OR REPLACE" clause)
"sql/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql",
"sql/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql",
"sql/org_mozilla_fenix_derived/clients_last_seen_v1/init.sql",
"sql/org_mozilla_vrbrowser_derived/clients_last_seen_v1/init.sql",
"sql/telemetry_derived/core_clients_last_seen_v1/init.sql",
"sql/telemetry/fxa_users_last_seen_raw_v1/init.sql",
"sql/telemetry_derived/core_clients_first_seen_v1/init.sql",
"sql/telemetry_derived/fxa_users_services_last_seen_v1/init.sql",
"sql/messaging_system_derived/cfr_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/onboarding_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/snippets_users_last_seen_v1/init.sql",
"sql/messaging_system_derived/whats_new_panel_users_last_seen_v1/init.sql",
# Reference table not found
"sql/monitoring/structured_detailed_error_counts_v1/view.sql",
"sql/org_mozilla_firefox_derived/migrated_clients_v1/query.sql",
"sql/org_mozilla_firefox_derived/incline_executive_v1/query.sql",
"sql/org_mozilla_firefox/migrated_clients/view.sql",
# No matching signature for function IF
"sql/static/fxa_amplitude_export_users_last_seen/query.sql",
# Duplicate UDF
"sql/static/fxa_amplitude_export_users_daily/query.sql",
# Syntax error
"sql/telemetry_derived/clients_last_seen_v1/init.sql",
# HTTP Error 408: Request Time-out
"sql/telemetry_derived/latest_versions/query.sql",
"sql/telemetry_derived/italy_covid19_outage_v1/query.sql",
# Query parameter not found
"sql/telemetry_derived/experiments_v1/query.sql",
"sql/telemetry_derived/clients_daily_scalar_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_scalar_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_boolean_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_daily_keyed_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_histogram_aggregates_v1/query.sql",
"sql/telemetry_derived/clients_histogram_bucket_counts_v1/query.sql",
"sql/telemetry_derived/glam_client_probe_counts_extract_v1/query.sql",
"sql/telemetry_derived/asn_aggregates_v1/query.sql",
# Dataset moz-fx-data-shared-prod:glam_etl was not found
*glob.glob("sql/glam_etl/**/*.sql", recursive=True),
# Query templates
"sql/search_derived/mobile_search_clients_daily_v1/fenix_metrics.template.sql",
"sql/search_derived/mobile_search_clients_daily_v1/mobile_search_clients_daily.template.sql", # noqa
}
def worker_entrypoint(sqlfile):
sql = open(sqlfile).read()
try:
r = urlopen(
Request(
DRY_RUN_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(
{"dataset": basename(dirname(dirname(sqlfile))), "query": sql}
).encode("utf8"),
method="POST",
)
)
except Exception as e:
print(f"{sqlfile:59} ERROR\n", e)
return False
response = json.load(r)
if "errors" in response and len(response["errors"]) == 1:
error = response["errors"][0]
else:
error = None
if response["valid"]:
print(f"{sqlfile:59} OK")
elif (
error
and error.get("code", None) in [400, 403]
and "does not have bigquery.tables.create permission for dataset"
in error.get("message", "")
):
# We want the dryrun service to only have read permissions, so
# we expect CREATE VIEW and CREATE TABLE to throw specific
# exceptions.
print(f"{sqlfile:59} OK")
else:
print(f"{sqlfile:59} ERROR\n", response["errors"])
return False
return True
def main():
sql_files = [f for f in glob.glob("sql/**/*.sql", recursive=True) if f not in SKIP]
with ThreadPool(8) as p:
result = p.map(worker_entrypoint, sql_files, chunksize=1)
if all(result):
exitcode = 0
else:
exitcode = 1
sys.exit(exitcode)
if __name__ == "__main__":
main()
exec python3 -m bigquery_etl.dryrun "$@"