diff --git a/bigquery_etl/query_scheduling/dag.py b/bigquery_etl/query_scheduling/dag.py index 793bbd7087..4a8866b1d3 100644 --- a/bigquery_etl/query_scheduling/dag.py +++ b/bigquery_etl/query_scheduling/dag.py @@ -92,7 +92,7 @@ class Dag: name: str = attr.ib() schedule_interval: str = attr.ib() default_args: DagDefaultArgs - tasks: List[Task] = [] + tasks: List[Task] = attr.ib([]) @name.validator def validate_dag_name(self, attribute, value): @@ -122,7 +122,7 @@ class Dag: def add_tasks(self, tasks): """Add tasks to be scheduled as part of the DAG.""" - self.tasks += tasks + self.tasks = self.tasks.copy() + tasks @classmethod def from_dict(cls, d): diff --git a/bigquery_etl/query_scheduling/dag_collection.py b/bigquery_etl/query_scheduling/dag_collection.py index 33614f2350..0d622ffb1d 100644 --- a/bigquery_etl/query_scheduling/dag_collection.py +++ b/bigquery_etl/query_scheduling/dag_collection.py @@ -63,16 +63,14 @@ class DagCollection: def with_tasks(self, tasks): """Assign tasks to their corresponding DAGs.""" - for dag_name, tasks in groupby(tasks, lambda t: t.dag_name): - dag = self.dag_by_name(dag_name) - - if dag is None: + for task in tasks: + if self.dag_by_name(task.dag_name) is None: raise InvalidDag( - f"DAG {dag_name} does not exist in dags.yaml" - "but used in task definition {tasks[0].name}." + f"DAG {task.dag_name} does not exist in dags.yaml" + "but used in task definition {dag_tasks[0].name}." ) else: - dag.add_tasks(tasks) + self.dag_by_name(task.dag_name).add_tasks([task]) return self diff --git a/bigquery_etl/query_scheduling/formatters.py b/bigquery_etl/query_scheduling/formatters.py index 862ab21162..d479c6cf89 100644 --- a/bigquery_etl/query_scheduling/formatters.py +++ b/bigquery_etl/query_scheduling/formatters.py @@ -56,3 +56,11 @@ def format_timedelta(timdelta_string): time_params[name] = int(param) return repr(timedelta(**time_params)) + + +def format_optional_string(val): + """Formats a value that is either None or a string.""" + if val is None: + return "None" + else: + return "'" + val + "'" diff --git a/bigquery_etl/query_scheduling/task.py b/bigquery_etl/query_scheduling/task.py index 9c16f53863..701b3aa899 100644 --- a/bigquery_etl/query_scheduling/task.py +++ b/bigquery_etl/query_scheduling/task.py @@ -5,7 +5,7 @@ import cattr import re import logging from google.cloud import bigquery -from typing import List, Optional +from typing import List, Optional, Union, NewType from bigquery_etl.metadata.parse_metadata import Metadata @@ -14,6 +14,7 @@ from bigquery_etl.query_scheduling.utils import is_date_string, is_email AIRFLOW_TASK_TEMPLATE = "airflow_task.j2" QUERY_FILE_RE = re.compile(r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/query\.sql$") +DEFAULT_PROJECT = "moz-fx-data-shared-prod" class TaskParseException(Exception): @@ -40,6 +41,11 @@ class UnscheduledTask(Exception): pass +# date_partition_parameter can be overriden with None or a string +# this type indicates that date_partition_parameter should not be changed +Ignore = NewType("Ignore", None) + + @attr.s(auto_attribs=True) class Task: """Representation of a task scheduled in Airflow.""" @@ -54,6 +60,7 @@ class Task: task_name: str = attr.ib(init=False) depends_on_past: bool = attr.ib(False) start_date: Optional[str] = attr.ib(None) + date_partition_parameter: Union[Optional[str], Ignore] = attr.ib(Ignore) @owner.validator def validate_owner(self, attribute, value): @@ -152,6 +159,7 @@ class Task: job_config = bigquery.QueryJobConfig( dry_run=True, use_query_cache=False, + default_dataset=f"{DEFAULT_PROJECT}.{self.dataset}", query_parameters=[ bigquery.ScalarQueryParameter("submission_date", "DATE", "2019-01-01") ], diff --git a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 index 7fd9746368..c1f9c8c188 100644 --- a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 @@ -24,17 +24,21 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval={{ schedule_ {%+ if task.start_date -%} start_date={{ task.start_date | format_date }}, {%+ endif -%} + {%+ if task.date_partition_parameter == None or task.date_partition_parameter is string -%} + date_partition_parameter={{ task.date_partition_parameter | format_optional_string }}, + {%+ endif -%} depends_on_past={{ task.depends_on_past }}, dag=dag, ) - {% for dependency in task.dependencies -%} - wait_for_{{ dependency.task_name }} = ExternalTaskSensor( - task_id='wait_for_{{ dependency.task_name }}', - external_dag_id='{{ dependency.dag_name }}', - external_task_id='{{ dependency.task_name }}', - dag=dag, - ) + + {% for dependency in task.dependencies %} + wait_for_{{ dependency.task_name }} = ExternalTaskSensor( + task_id='wait_for_{{ dependency.task_name }}', + external_dag_id='{{ dependency.dag_name }}', + external_task_id='{{ dependency.task_name }}', + dag=dag, + ) - {{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }}) + {{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }}) {% endfor -%} {% endfor -%} \ No newline at end of file diff --git a/dags.yaml b/dags.yaml index c5ab29b072..0fd352f9f2 100644 --- a/dags.yaml +++ b/dags.yaml @@ -7,3 +7,13 @@ bqetl_error_aggregates: retries: 1 retry_delay: 20m depends_on_past: false + +bqetl_kpi_dashboard: + schedule_interval: 45 15 * * * + default_args: + owner: jklukas@mozilla.com + start_date: '2020-05-12' + email: ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com'] + depends_on_past: false + retry_delay: 10m + retries: 1 diff --git a/dags/bqetl_error_aggregates.py b/dags/bqetl_error_aggregates.py index 7df8f6ded3..44a1f7d99b 100644 --- a/dags/bqetl_error_aggregates.py +++ b/dags/bqetl_error_aggregates.py @@ -18,3 +18,5 @@ with DAG('bqetl_error_aggregates', default_args=default_args, schedule_interval= depends_on_past=False, dag=dag, ) + + \ No newline at end of file diff --git a/dags/bqetl_kpi_dashboard.py b/dags/bqetl_kpi_dashboard.py new file mode 100644 index 0000000000..9611c390df --- /dev/null +++ b/dags/bqetl_kpi_dashboard.py @@ -0,0 +1,54 @@ +# Generated via query_scheduling/generate_airflow_dags + +from airflow import DAG +from airflow.operators.sensors import ExternalTaskSensor +import datetime +from utils.gcp import bigquery_etl_query + +default_args = {'owner': 'jklukas@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com'], 'depends_on_past': False, 'start_date': datetime.datetime(2020, 5, 12, 0, 0), 'retry_delay': 'datetime.timedelta(seconds=600)', 'email_on_failure': True, 'email_on_retry': True, 'retries': 1} + +with DAG('bqetl_kpi_dashboard', default_args=default_args, schedule_interval=datetime.timedelta(0)) as dag: + + telemetry_derived__smoot_usage_new_profiles__v2 = bigquery_etl_query( + destination_table='smoot_usage_new_profiles', + dataset_id='telemetry_derived', + project_id='moz-fx-data-shared-prod', + owner='jklukas@mozilla.com', + email=['jklukas@mozilla.com'], + depends_on_past=False, + dag=dag, + ) + + + telemetry_derived__smoot_usage_new_profiles_compressed__v2 = bigquery_etl_query( + destination_table='smoot_usage_new_profiles_compressed', + dataset_id='telemetry_derived', + project_id='moz-fx-data-shared-prod', + owner='jklukas@mozilla.com', + email=['jklukas@mozilla.com'], + depends_on_past=False, + dag=dag, + ) + + + wait_for_telemetry_derived__smoot_usage_new_profiles__v2 = ExternalTaskSensor( + task_id='wait_for_telemetry_derived__smoot_usage_new_profiles__v2', + external_dag_id='bqetl_kpi_dashboard', + external_task_id='telemetry_derived__smoot_usage_new_profiles__v2', + dag=dag, + ) + + telemetry_derived__smoot_usage_new_profiles_compressed__v2.set_upstream(wait_for_telemetry_derived__smoot_usage_new_profiles__v2) + + telemetry__firefox_kpi_dashboard__v1 = bigquery_etl_query( + destination_table='firefox_kpi_dashboard', + dataset_id='telemetry', + project_id='moz-fx-data-shared-prod', + owner='jklukas@mozilla.com', + email=['jklukas@mozilla.com', 'telemetry-alerts@mozilla.com'], + date_partition_parameter=None, + depends_on_past=False, + dag=dag, + ) + + \ No newline at end of file diff --git a/sql/telemetry/firefox_kpi_dashboard_v1/metadata.yaml b/sql/telemetry/firefox_kpi_dashboard_v1/metadata.yaml new file mode 100644 index 0000000000..c236b253b4 --- /dev/null +++ b/sql/telemetry/firefox_kpi_dashboard_v1/metadata.yaml @@ -0,0 +1,12 @@ +friendly_name: Firefox KPI dashboard +description: Firefox KPI dashboard +owners: + - jklukas@mozilla.com +labels: + incremental: true + schedule: 45 15 * * * +scheduling: + dag_name: bqetl_kpi_dashboard + depends_on_past: false + date_partition_parameter: null + email: ['telemetry-alerts@mozilla.com', 'jklukas@mozilla.com'] diff --git a/sql/telemetry_derived/smoot_usage_new_profiles_compressed_v2/metadata.yaml b/sql/telemetry_derived/smoot_usage_new_profiles_compressed_v2/metadata.yaml new file mode 100644 index 0000000000..29edd9fe01 --- /dev/null +++ b/sql/telemetry_derived/smoot_usage_new_profiles_compressed_v2/metadata.yaml @@ -0,0 +1,9 @@ +friendly_name: Smoot usage new profiles compressed +description: Smoot usage new profiles compressed +owners: + - jklukas@mozilla.com +labels: + incremental: true + schedule: 45 15 * * * +scheduling: + dag_name: bqetl_kpi_dashboard diff --git a/sql/telemetry_derived/smoot_usage_new_profiles_v2/metadata.yaml b/sql/telemetry_derived/smoot_usage_new_profiles_v2/metadata.yaml new file mode 100644 index 0000000000..3d54e4122d --- /dev/null +++ b/sql/telemetry_derived/smoot_usage_new_profiles_v2/metadata.yaml @@ -0,0 +1,9 @@ +friendly_name: Smoot usage new profiles +description: Smoot usage new profiles +owners: + - jklukas@mozilla.com +labels: + incremental: true + schedule: 45 15 * * * +scheduling: + dag_name: bqetl_kpi_dashboard