Bug 1635906 Add bqetl support for scripts and script for AET lookup (#1323)
* Bug 1635906 Add bqetl support for scripts and script for AET lookup There are some code changes here for DAG generation and for testing. * Apply suggestions from code review Co-authored-by: Anna Scholtz <anna@scholtzan.net> * DAG fixups Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Родитель
6201f5610b
Коммит
d539fafb59
|
@ -1,6 +1,7 @@
|
|||
*.pyc
|
||||
*.swp
|
||||
*.swo
|
||||
*.egg-info/
|
||||
.DS_Store
|
||||
.mypy_cache/
|
||||
venv/
|
||||
|
|
|
@ -69,6 +69,7 @@ class SqlTest(pytest.Item, pytest.File):
|
|||
dataset_name = self.fspath.dirpath().dirpath().basename
|
||||
|
||||
init_test = False
|
||||
script_test = False
|
||||
|
||||
# init tests write to dataset_query_test, instead of their
|
||||
# default name
|
||||
|
@ -83,6 +84,9 @@ class SqlTest(pytest.Item, pytest.File):
|
|||
)
|
||||
query = query.replace(original, dest_name)
|
||||
query_name = dest_name
|
||||
elif test_name == "test_script":
|
||||
script_test = True
|
||||
query = read(f"{self.fspath.dirname.replace('tests', 'sql')}/script.sql")
|
||||
else:
|
||||
query = read(f"{self.fspath.dirname.replace('tests', 'sql')}/query.sql")
|
||||
|
||||
|
@ -116,6 +120,7 @@ class SqlTest(pytest.Item, pytest.File):
|
|||
)
|
||||
query = query.replace(original, table_name)
|
||||
tables[table_name] = Table(table_name, source_format, source_path)
|
||||
print(f"Initialized {table_name}")
|
||||
elif extension == "sql":
|
||||
if "." in table_name:
|
||||
# combine project and dataset name with table name
|
||||
|
@ -141,7 +146,20 @@ class SqlTest(pytest.Item, pytest.File):
|
|||
# configure job
|
||||
res_table = bigquery.TableReference(default_dataset, query_name)
|
||||
|
||||
if not init_test:
|
||||
if init_test or script_test:
|
||||
job_config = bigquery.QueryJobConfig(
|
||||
default_dataset=default_dataset,
|
||||
query_parameters=get_query_params(self.fspath.strpath),
|
||||
use_legacy_sql=False,
|
||||
)
|
||||
|
||||
bq.query(query, job_config=job_config).result()
|
||||
# Retrieve final state of table on init or script tests
|
||||
job = bq.query(
|
||||
f"SELECT * FROM {dataset_id}.{query_name}", job_config=job_config
|
||||
)
|
||||
|
||||
else:
|
||||
job_config = bigquery.QueryJobConfig(
|
||||
default_dataset=default_dataset,
|
||||
destination=res_table,
|
||||
|
@ -153,19 +171,6 @@ class SqlTest(pytest.Item, pytest.File):
|
|||
# run query
|
||||
job = bq.query(query, job_config=job_config)
|
||||
|
||||
else:
|
||||
job_config = bigquery.QueryJobConfig(
|
||||
default_dataset=default_dataset,
|
||||
query_parameters=get_query_params(self.fspath.strpath),
|
||||
use_legacy_sql=False,
|
||||
)
|
||||
|
||||
bq.query(query, job_config=job_config).result()
|
||||
# retrieve results from new table on init test
|
||||
job = bq.query(
|
||||
f"SELECT * FROM {dataset_id}.{query_name}", job_config=job_config
|
||||
)
|
||||
|
||||
result = list(coerce_result(*job.result()))
|
||||
result.sort(key=lambda row: json.dumps(row, sort_keys=True))
|
||||
expect.sort(key=lambda row: json.dumps(row, sort_keys=True))
|
||||
|
|
|
@ -14,6 +14,7 @@ DEFAULT_SQL_DIR = "sql/"
|
|||
DEFAULT_DAGS_FILE = "dags.yaml"
|
||||
QUERY_FILE = "query.sql"
|
||||
QUERY_PART_FILE = "part1.sql"
|
||||
SCRIPT_FILE = "script.sql"
|
||||
DEFAULT_DAGS_DIR = "dags"
|
||||
TELEMETRY_AIRFLOW_GITHUB = "https://github.com/mozilla/telemetry-airflow.git"
|
||||
|
||||
|
@ -64,6 +65,9 @@ def get_dags(sql_dir, dags_config):
|
|||
task = Task.of_multipart_query(
|
||||
query_file, dag_collection=dag_collection
|
||||
)
|
||||
elif SCRIPT_FILE in files:
|
||||
query_file = os.path.join(root, SCRIPT_FILE)
|
||||
task = Task.of_script(query_file, dag_collection=dag_collection)
|
||||
else:
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
|
|
|
@ -24,7 +24,8 @@ from bigquery_etl.query_scheduling.utils import (
|
|||
|
||||
AIRFLOW_TASK_TEMPLATE = "airflow_task.j2"
|
||||
QUERY_FILE_RE = re.compile(
|
||||
r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/(?:query\.sql|part1\.sql)$"
|
||||
r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/"
|
||||
r"(?:query\.sql|part1\.sql|script\.sql)$"
|
||||
)
|
||||
DEFAULT_PROJECT = "moz-fx-data-shared-prod"
|
||||
DEFAULT_DESTINATION_TABLE_STR = "use-default-destination-table"
|
||||
|
@ -284,6 +285,19 @@ class Task:
|
|||
task.sql_file_path = os.path.dirname(query_file)
|
||||
return task
|
||||
|
||||
@classmethod
|
||||
def of_script(cls, query_file, metadata=None, dag_collection=None):
|
||||
"""
|
||||
Create task that schedules the corresponding script in Airflow.
|
||||
|
||||
Raises FileNotFoundError if no metadata file exists for query.
|
||||
If `metadata` is set, then it is used instead of the metadata.yaml
|
||||
file that might exist alongside the query file.
|
||||
"""
|
||||
task = cls.of_query(query_file, metadata, dag_collection)
|
||||
task.sql_file_path = query_file
|
||||
return task
|
||||
|
||||
def _get_referenced_tables(self):
|
||||
"""
|
||||
Perform a dry_run to get tables the query depends on.
|
||||
|
|
17
dags.yaml
17
dags.yaml
|
@ -200,11 +200,20 @@ bqetl_deletion_request_volume:
|
|||
retries: 2
|
||||
retry_delay: 30m
|
||||
|
||||
bqetl_fenix_event_rollup: # name of the DAG; must start with bqetl_
|
||||
schedule_interval: 0 2 * * * # query schedule
|
||||
bqetl_fenix_event_rollup:
|
||||
schedule_interval: 0 2 * * *
|
||||
default_args:
|
||||
owner: frank@mozilla.com
|
||||
start_date: '2020-09-09' # YYYY-MM-DD
|
||||
start_date: '2020-09-09'
|
||||
email: ['frank@mozilla.com']
|
||||
retries: 2 # number of retries if the query execution fails
|
||||
retries: 2
|
||||
retry_delay: 30m
|
||||
|
||||
bqetl_account_ecosystem:
|
||||
schedule_interval: 0 2 * * *
|
||||
default_args:
|
||||
owner: jklukas@mozilla.com
|
||||
start_date: '2020-09-17'
|
||||
email: ['jklukas@mozilla.com']
|
||||
retries: 2
|
||||
retry_delay: 30m
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.sensors import ExternalTaskSensor
|
||||
import datetime
|
||||
from utils.gcp import bigquery_etl_query
|
||||
|
||||
default_args = {
|
||||
"owner": "jklukas@mozilla.com",
|
||||
"start_date": datetime.datetime(2020, 9, 17, 0, 0),
|
||||
"email": ["jklukas@mozilla.com"],
|
||||
"depends_on_past": False,
|
||||
"retry_delay": datetime.timedelta(seconds=1800),
|
||||
"email_on_failure": True,
|
||||
"email_on_retry": True,
|
||||
"retries": 2,
|
||||
}
|
||||
|
||||
with DAG(
|
||||
"bqetl_account_ecosystem", default_args=default_args, schedule_interval="0 2 * * *"
|
||||
) as dag:
|
||||
|
||||
account_ecosystem_derived__ecosystem_user_id_lookup__v1 = bigquery_etl_query(
|
||||
task_id="account_ecosystem_derived__ecosystem_user_id_lookup__v1",
|
||||
destination_table="ecosystem_user_id_lookup_v1",
|
||||
dataset_id="account_ecosystem_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="jklukas@mozilla.com",
|
||||
email=["jklukas@mozilla.com"],
|
||||
date_partition_parameter=None,
|
||||
depends_on_past=True,
|
||||
parameters=["submission_date:DATE:{{ds}}"],
|
||||
sql_file_path="sql/account_ecosystem_derived/ecosystem_user_id_lookup_v1/script.sql",
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
wait_for_copy_deduplicate_all = ExternalTaskSensor(
|
||||
task_id="wait_for_copy_deduplicate_all",
|
||||
external_dag_id="copy_deduplicate",
|
||||
external_task_id="copy_deduplicate_all",
|
||||
execution_delta=datetime.timedelta(seconds=3600),
|
||||
check_existence=True,
|
||||
mode="reschedule",
|
||||
pool="DATA_ENG_EXTERNALTASKSENSOR",
|
||||
)
|
||||
|
||||
account_ecosystem_derived__ecosystem_user_id_lookup__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
|
@ -0,0 +1,6 @@
|
|||
CREATE TABLE IF NOT EXISTS
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.ecosystem_user_id_lookup_v1`(
|
||||
ecosystem_user_id STRING NOT NULL,
|
||||
canonical_id STRING NOT NULL,
|
||||
first_seen_date DATE NOT NULL
|
||||
)
|
|
@ -0,0 +1,22 @@
|
|||
friendly_name: Ecosystem User ID Lookup
|
||||
description: >
|
||||
Lookup table of ecosystem_user_id to canonical_id.
|
||||
|
||||
The canonical_id is the first ecosystem_user_id value observed for each user.
|
||||
Each ecosystem_user_id ever observed is guaranteed to appear exactly once.
|
||||
Many ecosystem_user_id values may map to the same canonical_id since the
|
||||
value associated with a user changes whenever the user changes their password.
|
||||
owners:
|
||||
- jklukas@mozilla.com
|
||||
labels:
|
||||
application: aet
|
||||
schedule: daily
|
||||
incremental: true
|
||||
scheduling:
|
||||
dag_name: bqetl_account_ecosystem
|
||||
depends_on_past: True
|
||||
referenced_tables: [['firefox_accounts_stable', 'account_ecosystem_v1']]
|
||||
# This is an unpartitioned table where the script adds rows via INSERT INTO,
|
||||
# thus the custom settings below.
|
||||
date_partition_parameter: null
|
||||
parameters: ["submission_date:DATE:{{ds}}"]
|
|
@ -0,0 +1,112 @@
|
|||
DECLARE checksum_pre,
|
||||
checksum_post,
|
||||
run_away_stop INT64 DEFAULT 0;
|
||||
|
||||
-- We initialize our "working set" temp table as the distinct set of client
|
||||
-- observations on the target date.
|
||||
CREATE TEMP TABLE
|
||||
working_set
|
||||
AS
|
||||
WITH unioned AS (
|
||||
SELECT
|
||||
submission_timestamp,
|
||||
ecosystem_user_id,
|
||||
previous_ecosystem_user_ids[SAFE_OFFSET(0)] AS previous_ecosystem_user_id
|
||||
FROM
|
||||
firefox_accounts_stable.account_ecosystem_v1
|
||||
),
|
||||
aggregated AS (
|
||||
SELECT
|
||||
ecosystem_user_id,
|
||||
array_agg(DISTINCT previous_ecosystem_user_id IGNORE NULLS) AS previous_ecosystem_user_ids
|
||||
FROM
|
||||
unioned
|
||||
WHERE
|
||||
DATE(submission_timestamp) = @submission_date
|
||||
GROUP BY
|
||||
ecosystem_user_id
|
||||
)
|
||||
SELECT
|
||||
ecosystem_user_id,
|
||||
IF(
|
||||
array_length(previous_ecosystem_user_ids) > 1,
|
||||
ERROR(FORMAT("Found more than 1 previous ID for %s", ecosystem_user_id)),
|
||||
previous_ecosystem_user_ids[SAFE_OFFSET(0)]
|
||||
) AS previous_ecosystem_user_id
|
||||
FROM
|
||||
aggregated;
|
||||
|
||||
-- If a user resets their FxA password multiple times in a single day, they
|
||||
-- will receive several new ecosystem_user_id values, each time emitting an
|
||||
-- event with the association between the new ID and the old. We need to be
|
||||
-- able to follow this chain of events to associate each of these IDs back
|
||||
-- to the earliest one.
|
||||
-- To accommodate this, we enter a loop where on each iteration we rewrite
|
||||
-- the working_set, linking back one step in the chain of IDs. We know we're
|
||||
-- done when the working_set remains unchanged after an iteration.
|
||||
LOOP
|
||||
SET run_away_stop = IF(
|
||||
run_away_stop >= 50,
|
||||
ERROR(
|
||||
"Did not converge after 50 iterations; there may be an infinite loop or a pathological client"
|
||||
),
|
||||
run_away_stop + 1
|
||||
);
|
||||
|
||||
SET checksum_pre = checksum_post;
|
||||
|
||||
CREATE OR REPLACE TEMP TABLE working_set
|
||||
AS
|
||||
SELECT
|
||||
newer.ecosystem_user_id,
|
||||
coalesce(
|
||||
older.previous_ecosystem_user_id,
|
||||
newer.previous_ecosystem_user_id
|
||||
) AS previous_ecosystem_user_id
|
||||
FROM
|
||||
working_set AS newer
|
||||
LEFT JOIN
|
||||
working_set AS older
|
||||
ON
|
||||
(newer.previous_ecosystem_user_id = older.ecosystem_user_id);
|
||||
|
||||
SET checksum_post = (
|
||||
SELECT
|
||||
BIT_XOR(FARM_FINGERPRINT(previous_ecosystem_user_id))
|
||||
FROM
|
||||
working_set
|
||||
);
|
||||
|
||||
IF(checksum_pre = checksum_post)
|
||||
THEN
|
||||
BREAK;
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
-- We now insert into the target table, including only newly observed
|
||||
-- ecosystem_user_id values.
|
||||
INSERT INTO
|
||||
ecosystem_user_id_lookup_v1(ecosystem_user_id, canonical_id, first_seen_date)
|
||||
WITH canonical_daily AS (
|
||||
SELECT
|
||||
ecosystem_user_id,
|
||||
COALESCE(previous_ecosystem_user_id, ecosystem_user_id) AS canonical_today
|
||||
FROM
|
||||
working_set
|
||||
)
|
||||
SELECT
|
||||
cd.ecosystem_user_id,
|
||||
coalesce(ci.canonical_id, cd.canonical_today) AS canonical_id,
|
||||
@submission_date AS first_seen_date
|
||||
FROM
|
||||
canonical_daily cd
|
||||
LEFT JOIN
|
||||
ecosystem_user_id_lookup_v1 AS ci
|
||||
ON
|
||||
(cd.canonical_today = ci.ecosystem_user_id)
|
||||
LEFT JOIN
|
||||
ecosystem_user_id_lookup_v1 AS existing
|
||||
ON
|
||||
(cd.ecosystem_user_id = existing.ecosystem_user_id)
|
||||
WHERE
|
||||
existing.ecosystem_user_id IS NULL;
|
|
@ -66,11 +66,13 @@ SELECT
|
|||
How to Configure a Generated Test
|
||||
===
|
||||
|
||||
1. Make a directory for test resources named `tests/{dataset}/{query_name}/{test_name}/`,
|
||||
1. Make a directory for test resources named `tests/{dataset}/{table}/{test_name}/`,
|
||||
e.g. `tests/telemetry_derived/clients_last_seen_raw_v1/test_single_day`
|
||||
- `query_name` must match a query file named `sql/{dataset}/{query_name}.sql`, e.g.
|
||||
`sql/telemetry_derived/clients_last_seen_v1.sql`
|
||||
- `table` must match a directory named like `sql/{dataset}/{table}`, e.g.
|
||||
`sql/telemetry_derived/clients_last_seen_v1`
|
||||
- `test_name` should start with `test_`, e.g. `test_single_day`
|
||||
- If `test_name` is `test_init` or `test_script`, then the query will run `init.sql`
|
||||
or `script.sql` respectively; otherwise, the test will run `query.sql`
|
||||
1. Add `.yaml` files for input tables, e.g. `clients_daily_v6.yaml`
|
||||
- Include the dataset prefix if it's set in the tested query,
|
||||
e.g. `analysis.clients_last_seen_v1.yaml`
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
- ecosystem_user_id: a1
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-01'
|
||||
- ecosystem_user_id: a2
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-03'
|
||||
- ecosystem_user_id: a3
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-05'
|
||||
- ecosystem_user_id: b1
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-02-01'
|
||||
- ecosystem_user_id: b2
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-03-14'
|
|
@ -0,0 +1,27 @@
|
|||
- ecosystem_user_id: a1
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-01'
|
||||
- ecosystem_user_id: a2
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-03'
|
||||
- ecosystem_user_id: a3
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-01-05'
|
||||
- ecosystem_user_id: b1
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-02-01'
|
||||
- ecosystem_user_id: b2
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-03-14'
|
||||
- ecosystem_user_id: a0
|
||||
canonical_id: a1
|
||||
first_seen_date: '2020-04-01'
|
||||
- ecosystem_user_id: b3
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-04-01'
|
||||
- ecosystem_user_id: b4
|
||||
canonical_id: b1
|
||||
first_seen_date: '2020-04-01'
|
||||
- ecosystem_user_id: c1
|
||||
canonical_id: c1
|
||||
first_seen_date: '2020-04-01'
|
|
@ -0,0 +1,21 @@
|
|||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: a0
|
||||
previous_ecosystem_user_ids: null
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: a0
|
||||
previous_ecosystem_user_ids: [a3]
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: a0
|
||||
previous_ecosystem_user_ids: null
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: b3
|
||||
previous_ecosystem_user_ids: [b2]
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: b3
|
||||
previous_ecosystem_user_ids: [b2]
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: b4
|
||||
previous_ecosystem_user_ids: [b3]
|
||||
- submission_timestamp: 2020-04-01 22:09:03.243074 UTC
|
||||
ecosystem_user_id: c1
|
||||
previous_ecosystem_user_ids: null
|
|
@ -0,0 +1,3 @@
|
|||
- name: submission_date
|
||||
type: DATE
|
||||
value: 2020-04-01
|
Загрузка…
Ссылка в новой задаче