diff --git a/bigquery_etl/query_scheduling/dag.py b/bigquery_etl/query_scheduling/dag.py index e30d23e631..793bbd7087 100644 --- a/bigquery_etl/query_scheduling/dag.py +++ b/bigquery_etl/query_scheduling/dag.py @@ -109,11 +109,13 @@ class Dag: Validate the schedule_interval format. Schedule intervals can be either in CRON format or one of: @once, @hourly, @daily, @weekly, @monthly, @yearly + or a timedelta []d[]h[]m """ # https://stackoverflow.com/questions/14203122/create-a-regular-expression-for-cron-statement pattern = re.compile( r"^(once|hourly|daily|weekly|monthly|yearly|" - r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7}))$" + r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7})|" + r"((\d+h)?(\d+m)?(\d+s)?))$" ) if not pattern.match(value): raise ValueError(f"Invalid schedule_interval {value}.") diff --git a/bigquery_etl/query_scheduling/formatters.py b/bigquery_etl/query_scheduling/formatters.py index a3148ef059..862ab21162 100644 --- a/bigquery_etl/query_scheduling/formatters.py +++ b/bigquery_etl/query_scheduling/formatters.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta import re from bigquery_etl import query_scheduling +from bigquery_etl.query_scheduling.utils import is_timedelta_string def format_schedule_interval(interval): @@ -15,10 +16,10 @@ def format_schedule_interval(interval): presets = ["once", "hourly", "daily", "weekly", "monthly", "yearly"] if interval in presets: - return "@" + interval + return "'@" + interval + "'" # the interval should be a CRON expression - return interval + return "'" + interval + "'" def format_attr(d, attribute, formatter_name): @@ -54,4 +55,4 @@ def format_timedelta(timdelta_string): if param: time_params[name] = int(param) - return timedelta(**time_params) + return repr(timedelta(**time_params)) diff --git a/bigquery_etl/query_scheduling/generate_airflow_dags.py b/bigquery_etl/query_scheduling/generate_airflow_dags.py index e0d3021bec..8de9a45065 100644 --- a/bigquery_etl/query_scheduling/generate_airflow_dags.py +++ b/bigquery_etl/query_scheduling/generate_airflow_dags.py @@ -48,22 +48,24 @@ parser.add_argument( standard_args.add_log_level(parser) -def setup_telemetry_airflow(): - """ - Download the telemetry-airflow repository to a temporary directory and - copy generated DAGs to dags/ folder. +# This will be needed later for determining external dependencies +# +# def setup_telemetry_airflow(): +# """ +# Download the telemetry-airflow repository to a temporary directory and +# copy generated DAGs to dags/ folder. - Returns the directory all Airflow DAGs are stored in locally. - """ - tmp_dir = tempfile.gettempdir() + "/telemetry-airflow/" +# Returns the directory all Airflow DAGs are stored in locally. +# """ +# tmp_dir = tempfile.gettempdir() + "/telemetry-airflow/" - # the repository can only be cloned into an empty directory - shutil.rmtree(tmp_dir) +# # the repository can only be cloned into an empty directory +# shutil.rmtree(tmp_dir) - Repo.clone_from(TELEMETRY_AIRFLOW_GITHUB, tmp_dir) +# Repo.clone_from(TELEMETRY_AIRFLOW_GITHUB, tmp_dir) - airflow_dag_dir = tmp_dir + "/dags" - return airflow_dag_dir +# airflow_dag_dir = tmp_dir + "/dags" +# return airflow_dag_dir def get_dags(sql_dir, dags_config): @@ -112,8 +114,6 @@ def main(): dags = get_dags(args.sql_dir, args.dags_config) dags.to_airflow_dags(dags_output_dir, client) - setup_telemetry_airflow() - if __name__ == "__main__": main() diff --git a/bigquery_etl/query_scheduling/task.py b/bigquery_etl/query_scheduling/task.py index bc3c36ea5a..9c16f53863 100644 --- a/bigquery_etl/query_scheduling/task.py +++ b/bigquery_etl/query_scheduling/task.py @@ -44,7 +44,7 @@ class UnscheduledTask(Exception): class Task: """Representation of a task scheduled in Airflow.""" - dag_name: str + dag_name: str = attr.ib() query_file: str owner: str = attr.ib() email: List[str] = attr.ib([]) @@ -73,6 +73,15 @@ class Task: "Dates should be specified as YYYY-MM-DD." ) + @dag_name.validator + def validate_dag_name(self, attribute, value): + """Validate the DAG name.""" + dag_name_pattern = re.compile("^bqetl_.+$") + if not dag_name_pattern.match(value): + raise ValueError( + f"Invalid DAG name {value} for task. Name must start with 'bqetl_'." + ) + def __attrs_post_init__(self): """Extract information from the query file name.""" query_file_re = re.search(QUERY_FILE_RE, self.query_file) @@ -100,7 +109,7 @@ class Task: if metadata is None: metadata = Metadata.of_sql_file(query_file) - if metadata.scheduling == {}: + if metadata.scheduling == {} or "dag_name" not in metadata.scheduling: raise UnscheduledTask( f"Metadata for {query_file} does not contain scheduling information." ) diff --git a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 index 261be4eac0..7fd9746368 100644 --- a/bigquery_etl/query_scheduling/templates/airflow_dag.j2 +++ b/bigquery_etl/query_scheduling/templates/airflow_dag.j2 @@ -11,7 +11,7 @@ default_args = {{ format_attr("retry_delay", "format_timedelta") }} -with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule_interval | format_schedule_interval }}') as dag: +with DAG('{{ name }}', default_args=default_args, schedule_interval={{ schedule_interval | format_timedelta }}) as dag: {% for task in tasks %} {{ task.task_name }} = bigquery_etl_query( destination_table='{{ task.table }}', @@ -22,12 +22,12 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule email={{ task.email }}, {%+ endif -%} {%+ if task.start_date -%} - start_date={{ task.start_date | format_date }} + start_date={{ task.start_date | format_date }}, {%+ endif -%} + depends_on_past={{ task.depends_on_past }}, dag=dag, ) - - {% for dependency in task.dependencies %} + {% for dependency in task.dependencies -%} wait_for_{{ dependency.task_name }} = ExternalTaskSensor( task_id='wait_for_{{ dependency.task_name }}', external_dag_id='{{ dependency.dag_name }}', @@ -36,5 +36,5 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule ) {{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }}) - {% endfor %} -{% endfor %} + {% endfor -%} +{% endfor -%} \ No newline at end of file diff --git a/dags.yaml b/dags.yaml index ed16d24a9b..c5ab29b072 100644 --- a/dags.yaml +++ b/dags.yaml @@ -1,7 +1,9 @@ bqetl_error_aggregates: - schedule_interval: hourly # todo: datetime.timedelta(hours=3) + schedule_interval: 3h default_args: owner: bewu@mozilla.com email: ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'] start_date: '2019-11-01' retries: 1 + retry_delay: 20m + depends_on_past: false diff --git a/dags/bqetl_error_aggregates.py b/dags/bqetl_error_aggregates.py index fbdbf15943..7df8f6ded3 100644 --- a/dags/bqetl_error_aggregates.py +++ b/dags/bqetl_error_aggregates.py @@ -2,17 +2,19 @@ from airflow import DAG from airflow.operators.sensors import ExternalTaskSensor -from datetime import datetime, timedelta +import datetime from utils.gcp import bigquery_etl_query +default_args = {'owner': 'bewu@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'], 'depends_on_past': False, 'start_date': datetime.datetime(2019, 11, 1, 0, 0), 'retry_delay': 'datetime.timedelta(seconds=1200)', 'email_on_failure': True, 'email_on_retry': True, 'retries': 1} -with DAG("bqetl_error_aggregates", default_args={'owner': 'bewu@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'], 'start_date': '2019-11-01', 'retries': 1}, schedule_interval="hourly") as dag: +with DAG('bqetl_error_aggregates', default_args=default_args, schedule_interval=datetime.timedelta(seconds=10800)) as dag: telemetry_derived__error_aggregates__v1 = bigquery_etl_query( - destination_table="error_aggregates", - dataset_id="telemetry_derived", + destination_table='error_aggregates', + dataset_id='telemetry_derived', project_id='moz-fx-data-shared-prod', + owner='bewu@mozilla.com', + email=['bewu@mozilla.com'], + depends_on_past=False, dag=dag, ) - - diff --git a/sql/telemetry_derived/error_aggregates_v1/metadata.yaml b/sql/telemetry_derived/error_aggregates_v1/metadata.yaml index 760f0ea5b6..d56715c2b2 100644 --- a/sql/telemetry_derived/error_aggregates_v1/metadata.yaml +++ b/sql/telemetry_derived/error_aggregates_v1/metadata.yaml @@ -5,7 +5,7 @@ owners: - bewu@mozilla.com labels: incremental: true - schedule: hourly # todo: datetime.timedelta(hours=3) + schedule: 3h scheduling: dag_name: bqetl_error_aggregates depends_on_past: false