KPI dashboard generated Airflow DAG

This commit is contained in:
Anna Scholtz 2020-05-21 17:01:42 -07:00
Родитель d5822b952d
Коммит e7b2b56c01
11 изменённых файлов: 132 добавлений и 18 удалений

Просмотреть файл

@ -92,7 +92,7 @@ class Dag:
name: str = attr.ib()
schedule_interval: str = attr.ib()
default_args: DagDefaultArgs
tasks: List[Task] = []
tasks: List[Task] = attr.ib([])
@name.validator
def validate_dag_name(self, attribute, value):
@ -122,7 +122,7 @@ class Dag:
def add_tasks(self, tasks):
"""Add tasks to be scheduled as part of the DAG."""
self.tasks += tasks
self.tasks = self.tasks.copy() + tasks
@classmethod
def from_dict(cls, d):

Просмотреть файл

@ -63,16 +63,14 @@ class DagCollection:
def with_tasks(self, tasks):
"""Assign tasks to their corresponding DAGs."""
for dag_name, tasks in groupby(tasks, lambda t: t.dag_name):
dag = self.dag_by_name(dag_name)
if dag is None:
for task in tasks:
if self.dag_by_name(task.dag_name) is None:
raise InvalidDag(
f"DAG {dag_name} does not exist in dags.yaml"
"but used in task definition {tasks[0].name}."
f"DAG {task.dag_name} does not exist in dags.yaml"
"but used in task definition {dag_tasks[0].name}."
)
else:
dag.add_tasks(tasks)
self.dag_by_name(task.dag_name).add_tasks([task])
return self

Просмотреть файл

@ -56,3 +56,11 @@ def format_timedelta(timdelta_string):
time_params[name] = int(param)
return repr(timedelta(**time_params))
def format_optional_string(val):
"""Formats a value that is either None or a string."""
if val is None:
return "None"
else:
return "'" + val + "'"

Просмотреть файл

@ -5,7 +5,7 @@ import cattr
import re
import logging
from google.cloud import bigquery
from typing import List, Optional
from typing import List, Optional, Union, NewType
from bigquery_etl.metadata.parse_metadata import Metadata
@ -14,6 +14,7 @@ from bigquery_etl.query_scheduling.utils import is_date_string, is_email
AIRFLOW_TASK_TEMPLATE = "airflow_task.j2"
QUERY_FILE_RE = re.compile(r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/query\.sql$")
DEFAULT_PROJECT = "moz-fx-data-shared-prod"
class TaskParseException(Exception):
@ -40,6 +41,11 @@ class UnscheduledTask(Exception):
pass
# date_partition_parameter can be overriden with None or a string
# this type indicates that date_partition_parameter should not be changed
Ignore = NewType("Ignore", None)
@attr.s(auto_attribs=True)
class Task:
"""Representation of a task scheduled in Airflow."""
@ -54,6 +60,7 @@ class Task:
task_name: str = attr.ib(init=False)
depends_on_past: bool = attr.ib(False)
start_date: Optional[str] = attr.ib(None)
date_partition_parameter: Union[Optional[str], Ignore] = attr.ib(Ignore)
@owner.validator
def validate_owner(self, attribute, value):
@ -152,6 +159,7 @@ class Task:
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
default_dataset=f"{DEFAULT_PROJECT}.{self.dataset}",
query_parameters=[
bigquery.ScalarQueryParameter("submission_date", "DATE", "2019-01-01")
],

Просмотреть файл

@ -24,17 +24,21 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval={{ schedule_
{%+ if task.start_date -%}
start_date={{ task.start_date | format_date }},
{%+ endif -%}
{%+ if task.date_partition_parameter == None or task.date_partition_parameter is string -%}
date_partition_parameter={{ task.date_partition_parameter | format_optional_string }},
{%+ endif -%}
depends_on_past={{ task.depends_on_past }},
dag=dag,
)
{% for dependency in task.dependencies -%}
wait_for_{{ dependency.task_name }} = ExternalTaskSensor(
task_id='wait_for_{{ dependency.task_name }}',
external_dag_id='{{ dependency.dag_name }}',
external_task_id='{{ dependency.task_name }}',
dag=dag,
)
{% for dependency in task.dependencies %}
wait_for_{{ dependency.task_name }} = ExternalTaskSensor(
task_id='wait_for_{{ dependency.task_name }}',
external_dag_id='{{ dependency.dag_name }}',
external_task_id='{{ dependency.task_name }}',
dag=dag,
)
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }})
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }})
{% endfor -%}
{% endfor -%}

Просмотреть файл

@ -7,3 +7,13 @@ bqetl_error_aggregates:
retries: 1
retry_delay: 20m
depends_on_past: false
bqetl_kpi_dashboard:
schedule_interval: 45 15 * * *
default_args:
owner: jklukas@mozilla.com
start_date: '2020-05-12'
email: ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com']
depends_on_past: false
retry_delay: 10m
retries: 1

Просмотреть файл

@ -18,3 +18,5 @@ with DAG('bqetl_error_aggregates', default_args=default_args, schedule_interval=
depends_on_past=False,
dag=dag,
)

Просмотреть файл

@ -0,0 +1,54 @@
# Generated via query_scheduling/generate_airflow_dags
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
import datetime
from utils.gcp import bigquery_etl_query
default_args = {'owner': 'jklukas@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com'], 'depends_on_past': False, 'start_date': datetime.datetime(2020, 5, 12, 0, 0), 'retry_delay': 'datetime.timedelta(seconds=600)', 'email_on_failure': True, 'email_on_retry': True, 'retries': 1}
with DAG('bqetl_kpi_dashboard', default_args=default_args, schedule_interval=datetime.timedelta(0)) as dag:
telemetry_derived__smoot_usage_new_profiles__v2 = bigquery_etl_query(
destination_table='smoot_usage_new_profiles',
dataset_id='telemetry_derived',
project_id='moz-fx-data-shared-prod',
owner='jklukas@mozilla.com',
email=['jklukas@mozilla.com'],
depends_on_past=False,
dag=dag,
)
telemetry_derived__smoot_usage_new_profiles_compressed__v2 = bigquery_etl_query(
destination_table='smoot_usage_new_profiles_compressed',
dataset_id='telemetry_derived',
project_id='moz-fx-data-shared-prod',
owner='jklukas@mozilla.com',
email=['jklukas@mozilla.com'],
depends_on_past=False,
dag=dag,
)
wait_for_telemetry_derived__smoot_usage_new_profiles__v2 = ExternalTaskSensor(
task_id='wait_for_telemetry_derived__smoot_usage_new_profiles__v2',
external_dag_id='bqetl_kpi_dashboard',
external_task_id='telemetry_derived__smoot_usage_new_profiles__v2',
dag=dag,
)
telemetry_derived__smoot_usage_new_profiles_compressed__v2.set_upstream(wait_for_telemetry_derived__smoot_usage_new_profiles__v2)
telemetry__firefox_kpi_dashboard__v1 = bigquery_etl_query(
destination_table='firefox_kpi_dashboard',
dataset_id='telemetry',
project_id='moz-fx-data-shared-prod',
owner='jklukas@mozilla.com',
email=['jklukas@mozilla.com', 'telemetry-alerts@mozilla.com'],
date_partition_parameter=None,
depends_on_past=False,
dag=dag,
)

Просмотреть файл

@ -0,0 +1,12 @@
friendly_name: Firefox KPI dashboard
description: Firefox KPI dashboard
owners:
- jklukas@mozilla.com
labels:
incremental: true
schedule: 45 15 * * *
scheduling:
dag_name: bqetl_kpi_dashboard
depends_on_past: false
date_partition_parameter: null
email: ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com']

Просмотреть файл

@ -0,0 +1,9 @@
friendly_name: Smoot usage new profiles compressed
description: Smoot usage new profiles compressed
owners:
- jklukas@mozilla.com
labels:
incremental: true
schedule: 45 15 * * *
scheduling:
dag_name: bqetl_kpi_dashboard

Просмотреть файл

@ -0,0 +1,9 @@
friendly_name: Smoot usage new profiles
description: Smoot usage new profiles
owners:
- jklukas@mozilla.com
labels:
incremental: true
schedule: 45 15 * * *
scheduling:
dag_name: bqetl_kpi_dashboard