This commit is contained in:
Anna Scholtz 2020-07-10 14:09:59 -07:00
Родитель 2d8e08294b
Коммит 2fa01c6ddb
4 изменённых файлов: 115 добавлений и 0 удалений

Просмотреть файл

@ -103,6 +103,7 @@ class Task:
multipart: bool = attr.ib(False)
sql_file_path: Optional[str] = None
priority: Optional[int] = None
skip_dry_run: bool = attr.ib(False)
referenced_tables: Optional[List[Tuple[str, str]]] = attr.ib(None)
@owner.validator
@ -235,8 +236,43 @@ class Task:
of dependencies. See https://cloud.google.com/bigquery/docs/reference/
rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
"""
if self.skip_dry_run:
logging.info(f"Skip getting dependencies for {self.task_name}")
return []
logging.info(f"Get dependencies for {self.task_name}")
# check if there are any query parameters that need to be set for dry-running
query_parameters = [
bigquery.ScalarQueryParameter(*(param.split(":")))
for param in self.parameters
if "submission_date" not in param
]
# the submission_date parameter needs to be set to make the dry run faster
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
default_dataset=f"{DEFAULT_PROJECT}.{self.dataset}",
query_parameters=[
bigquery.ScalarQueryParameter("submission_date", "DATE", "2019-01-01")
]
+ query_parameters,
)
table_names = set()
query_files = [self.query_file]
if self.multipart:
# dry_run all files if query is split into multiple parts
query_files = glob.glob(self.sql_file_path + "/*.sql")
for query_file in query_files:
with open(query_file) as query_stream:
query = query_stream.read()
query_job = client.query(query, job_config=job_config)
referenced_tables = query_job.referenced_tables
if self.referenced_tables is None:
# the submission_date parameter needs to be set to make the dry run faster
job_config = bigquery.QueryJobConfig(

Просмотреть файл

@ -152,6 +152,15 @@ bqetl_document_sample:
retries: 2
retry_delay: 30m
bqetl_asn_aggregates:
schedule_interval: 0 2 * * *
default_args:
owner: ascholtz@mozilla.com
start_date: '2020-04-05'
email: ['ascholtz@mozilla.com', 'tdsmith@mozilla.com']
retries: 2
retry_delay: 30m
# DAG for exporting query data marked as public to GCS
# queries should not be explicitly assigned to this DAG (it's done automatically)
bqetl_public_data_json:

Просмотреть файл

@ -0,0 +1,59 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/master/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
import datetime
from utils.gcp import bigquery_etl_query
default_args = {
"owner": "ascholtz@mozilla.com",
"start_date": datetime.datetime(2020, 4, 5, 0, 0),
"email": ["ascholtz@mozilla.com", "tdsmith@mozilla.com"],
"depends_on_past": False,
"retry_delay": datetime.timedelta(seconds=1800),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
}
with DAG(
"bqetl_asn_aggregates", default_args=default_args, schedule_interval="0 2 * * *"
) as dag:
telemetry_derived__asn_aggregates__v1 = bigquery_etl_query(
task_id="telemetry_derived__asn_aggregates__v1",
destination_table="asn_aggregates_v1",
dataset_id="telemetry_derived",
project_id="moz-fx-data-shared-prod",
owner="tdsmith@mozilla.com",
email=["ascholtz@mozilla.com", "tdsmith@mozilla.com"],
date_partition_parameter="submission_date",
depends_on_past=False,
parameters=["n_clients:INT64:500"],
dag=dag,
)
wait_for_copy_deduplicate_event_events = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
check_existence=True,
mode="reschedule",
dag=dag,
)
telemetry_derived__asn_aggregates__v1.set_upstream(
wait_for_copy_deduplicate_event_events
)
wait_for_copy_deduplicate_bq_main_events = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
check_existence=True,
mode="reschedule",
dag=dag,
)
telemetry_derived__asn_aggregates__v1.set_upstream(
wait_for_copy_deduplicate_bq_main_events
)

Просмотреть файл

@ -9,3 +9,14 @@ owners:
labels:
incremental: true
schedule: daily
scheduling:
dag_name: bqetl_asn_aggregates
parameters: ["n_clients:INT64:500"]
skip_dry_run: true # query accesses payload_bytes_raw.telemetry
depends_on:
- dag_name: copy_deduplicate
task_id: event_events
exectuion_delta: 1h
- dag_name: copy_deduplicate
task_id: bq_main_events
exectuion_delta: 1h