Error aggregates as scheduled query

This commit is contained in:
Anna Scholtz 2020-05-13 14:55:57 -07:00
Родитель 1116be16b6
Коммит 22f1c39b5b
7 изменённых файлов: 51 добавлений и 5 удалений

Просмотреть файл

@ -9,6 +9,7 @@ from google.cloud import bigquery
from ..util import standard_args
import shutil
import tempfile
from pathlib import Path
from bigquery_etl.query_scheduling.dag_collection import DagCollection
from bigquery_etl.query_scheduling.task import Task, UnscheduledTask
@ -17,7 +18,7 @@ from bigquery_etl.query_scheduling.task import Task, UnscheduledTask
DEFAULT_SQL_DIR = "sql/"
DEFAULT_DAGS_FILE = "dags.yaml"
QUERY_FILE = "query.sql"
DEFAULT_DAGS_DIR = "dags/"
DEFAULT_DAGS_DIR = "dags"
TELEMETRY_AIRFLOW_GITHUB = "https://github.com/mozilla/telemetry-airflow.git"
parser = ArgumentParser(description=__doc__)
@ -105,11 +106,12 @@ def main():
"""Generate Airflow DAGs."""
args = parser.parse_args()
client = bigquery.Client(args.project_id)
dags_output_dir = Path(args.output_dir)
dags = get_dags(args.sql_dir, args.dags_config)
dags.to_airflow_dags(args.output_dir, client)
dags.to_airflow_dags(dags_output_dir, client)
setup_telemetry_airflow(args.output_dir)
setup_telemetry_airflow(dags_output_dir)
if __name__ == "__main__":

Просмотреть файл

@ -88,7 +88,16 @@ class Task:
of dependencies. See https://cloud.google.com/bigquery/docs/reference/
rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
"""
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
logging.info(f"Get dependencies for {self.task_name}")
# the submission_date parameter needs to be set to make the dry run faster
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
query_parameters=[
bigquery.ScalarQueryParameter("submission_date", "DATE", "2019-01-01")
],
)
with open(self.query_file) as query_stream:
query = query_stream.read()

Просмотреть файл

@ -10,6 +10,7 @@ with DAG("{{ name }}", default_args={{ default_args }}, schedule_interval="{{ sc
{{ task.task_name }} = bigquery_etl_query(
destination_table="{{ task.table }}",
dataset_id="{{ task.dataset }}",
project_id='moz-fx-data-shared-prod',
dag=dag,
)
@ -17,7 +18,7 @@ with DAG("{{ name }}", default_args={{ default_args }}, schedule_interval="{{ sc
wait_for_{{ dependency.task_name }} = ExternalTaskSensor(
task_id="wait_for_{{ dependency.task_name }}",
external_dag_id="{{ dependency.dag_name }}",
external_task_id="{{ dependency.task_name }}",
external_task_id="{{ dependency.task_name }}",
dag=dag,
)

Просмотреть файл

@ -0,0 +1,7 @@
bqetl_error_aggregates:
schedule_interval: hourly # todo: datetime.timedelta(hours=3)
default_args:
owner: bewu@mozilla.com
email: ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com']
start_date: '2019-11-01'
retries: 1

Просмотреть файл

@ -0,0 +1,16 @@
# Generated via query_scheduling/generate_airflow_dags
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta
from utils.gcp import bigquery_etl_query
with DAG("bqetl_error_aggregates", default_args={'owner': 'bewu@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'], 'start_date': '2019-11-01', 'retries': 1}, schedule_interval="hourly") as dag:
telemetry_derived__error_aggregates__v1 = bigquery_etl_query(
destination_table="error_aggregates",
dataset_id="telemetry_derived",
dag=dag,
)

Просмотреть файл

@ -0,0 +1,11 @@
friendly_name: Error aggregates
description: >-
Counts of various error measures aggregated across each unique set of dimensions.
owners:
- bewu@mozilla.com
labels:
incremental: true
schedule: hourly # todo: datetime.timedelta(hours=3)
scheduling:
dag_name: bqetl_error_aggregates
depends_on_past: false