Generate error aggregates DAG
This commit is contained in:
Родитель
368020f37d
Коммит
d5822b952d
|
@ -109,11 +109,13 @@ class Dag:
|
|||
Validate the schedule_interval format.
|
||||
Schedule intervals can be either in CRON format or one of:
|
||||
@once, @hourly, @daily, @weekly, @monthly, @yearly
|
||||
or a timedelta []d[]h[]m
|
||||
"""
|
||||
# https://stackoverflow.com/questions/14203122/create-a-regular-expression-for-cron-statement
|
||||
pattern = re.compile(
|
||||
r"^(once|hourly|daily|weekly|monthly|yearly|"
|
||||
r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7}))$"
|
||||
r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7})|"
|
||||
r"((\d+h)?(\d+m)?(\d+s)?))$"
|
||||
)
|
||||
if not pattern.match(value):
|
||||
raise ValueError(f"Invalid schedule_interval {value}.")
|
||||
|
|
|
@ -7,6 +7,7 @@ from datetime import datetime, timedelta
|
|||
import re
|
||||
|
||||
from bigquery_etl import query_scheduling
|
||||
from bigquery_etl.query_scheduling.utils import is_timedelta_string
|
||||
|
||||
|
||||
def format_schedule_interval(interval):
|
||||
|
@ -15,10 +16,10 @@ def format_schedule_interval(interval):
|
|||
presets = ["once", "hourly", "daily", "weekly", "monthly", "yearly"]
|
||||
|
||||
if interval in presets:
|
||||
return "@" + interval
|
||||
return "'@" + interval + "'"
|
||||
|
||||
# the interval should be a CRON expression
|
||||
return interval
|
||||
return "'" + interval + "'"
|
||||
|
||||
|
||||
def format_attr(d, attribute, formatter_name):
|
||||
|
@ -54,4 +55,4 @@ def format_timedelta(timdelta_string):
|
|||
if param:
|
||||
time_params[name] = int(param)
|
||||
|
||||
return timedelta(**time_params)
|
||||
return repr(timedelta(**time_params))
|
||||
|
|
|
@ -48,22 +48,24 @@ parser.add_argument(
|
|||
standard_args.add_log_level(parser)
|
||||
|
||||
|
||||
def setup_telemetry_airflow():
|
||||
"""
|
||||
Download the telemetry-airflow repository to a temporary directory and
|
||||
copy generated DAGs to dags/ folder.
|
||||
# This will be needed later for determining external dependencies
|
||||
#
|
||||
# def setup_telemetry_airflow():
|
||||
# """
|
||||
# Download the telemetry-airflow repository to a temporary directory and
|
||||
# copy generated DAGs to dags/ folder.
|
||||
|
||||
Returns the directory all Airflow DAGs are stored in locally.
|
||||
"""
|
||||
tmp_dir = tempfile.gettempdir() + "/telemetry-airflow/"
|
||||
# Returns the directory all Airflow DAGs are stored in locally.
|
||||
# """
|
||||
# tmp_dir = tempfile.gettempdir() + "/telemetry-airflow/"
|
||||
|
||||
# the repository can only be cloned into an empty directory
|
||||
shutil.rmtree(tmp_dir)
|
||||
# # the repository can only be cloned into an empty directory
|
||||
# shutil.rmtree(tmp_dir)
|
||||
|
||||
Repo.clone_from(TELEMETRY_AIRFLOW_GITHUB, tmp_dir)
|
||||
# Repo.clone_from(TELEMETRY_AIRFLOW_GITHUB, tmp_dir)
|
||||
|
||||
airflow_dag_dir = tmp_dir + "/dags"
|
||||
return airflow_dag_dir
|
||||
# airflow_dag_dir = tmp_dir + "/dags"
|
||||
# return airflow_dag_dir
|
||||
|
||||
|
||||
def get_dags(sql_dir, dags_config):
|
||||
|
@ -112,8 +114,6 @@ def main():
|
|||
dags = get_dags(args.sql_dir, args.dags_config)
|
||||
dags.to_airflow_dags(dags_output_dir, client)
|
||||
|
||||
setup_telemetry_airflow()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
@ -44,7 +44,7 @@ class UnscheduledTask(Exception):
|
|||
class Task:
|
||||
"""Representation of a task scheduled in Airflow."""
|
||||
|
||||
dag_name: str
|
||||
dag_name: str = attr.ib()
|
||||
query_file: str
|
||||
owner: str = attr.ib()
|
||||
email: List[str] = attr.ib([])
|
||||
|
@ -73,6 +73,15 @@ class Task:
|
|||
"Dates should be specified as YYYY-MM-DD."
|
||||
)
|
||||
|
||||
@dag_name.validator
|
||||
def validate_dag_name(self, attribute, value):
|
||||
"""Validate the DAG name."""
|
||||
dag_name_pattern = re.compile("^bqetl_.+$")
|
||||
if not dag_name_pattern.match(value):
|
||||
raise ValueError(
|
||||
f"Invalid DAG name {value} for task. Name must start with 'bqetl_'."
|
||||
)
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
"""Extract information from the query file name."""
|
||||
query_file_re = re.search(QUERY_FILE_RE, self.query_file)
|
||||
|
@ -100,7 +109,7 @@ class Task:
|
|||
if metadata is None:
|
||||
metadata = Metadata.of_sql_file(query_file)
|
||||
|
||||
if metadata.scheduling == {}:
|
||||
if metadata.scheduling == {} or "dag_name" not in metadata.scheduling:
|
||||
raise UnscheduledTask(
|
||||
f"Metadata for {query_file} does not contain scheduling information."
|
||||
)
|
||||
|
|
|
@ -11,7 +11,7 @@ default_args = {{
|
|||
format_attr("retry_delay", "format_timedelta")
|
||||
}}
|
||||
|
||||
with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule_interval | format_schedule_interval }}') as dag:
|
||||
with DAG('{{ name }}', default_args=default_args, schedule_interval={{ schedule_interval | format_timedelta }}) as dag:
|
||||
{% for task in tasks %}
|
||||
{{ task.task_name }} = bigquery_etl_query(
|
||||
destination_table='{{ task.table }}',
|
||||
|
@ -22,12 +22,12 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule
|
|||
email={{ task.email }},
|
||||
{%+ endif -%}
|
||||
{%+ if task.start_date -%}
|
||||
start_date={{ task.start_date | format_date }}
|
||||
start_date={{ task.start_date | format_date }},
|
||||
{%+ endif -%}
|
||||
depends_on_past={{ task.depends_on_past }},
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
{% for dependency in task.dependencies %}
|
||||
{% for dependency in task.dependencies -%}
|
||||
wait_for_{{ dependency.task_name }} = ExternalTaskSensor(
|
||||
task_id='wait_for_{{ dependency.task_name }}',
|
||||
external_dag_id='{{ dependency.dag_name }}',
|
||||
|
@ -36,5 +36,5 @@ with DAG('{{ name }}', default_args=default_args, schedule_interval='{{ schedule
|
|||
)
|
||||
|
||||
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_name }})
|
||||
{% endfor %}
|
||||
{% endfor %}
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
|
@ -1,7 +1,9 @@
|
|||
bqetl_error_aggregates:
|
||||
schedule_interval: hourly # todo: datetime.timedelta(hours=3)
|
||||
schedule_interval: 3h
|
||||
default_args:
|
||||
owner: bewu@mozilla.com
|
||||
email: ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com']
|
||||
start_date: '2019-11-01'
|
||||
retries: 1
|
||||
retry_delay: 20m
|
||||
depends_on_past: false
|
||||
|
|
|
@ -2,17 +2,19 @@
|
|||
|
||||
from airflow import DAG
|
||||
from airflow.operators.sensors import ExternalTaskSensor
|
||||
from datetime import datetime, timedelta
|
||||
import datetime
|
||||
from utils.gcp import bigquery_etl_query
|
||||
|
||||
default_args = {'owner': 'bewu@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'], 'depends_on_past': False, 'start_date': datetime.datetime(2019, 11, 1, 0, 0), 'retry_delay': 'datetime.timedelta(seconds=1200)', 'email_on_failure': True, 'email_on_retry': True, 'retries': 1}
|
||||
|
||||
with DAG("bqetl_error_aggregates", default_args={'owner': 'bewu@mozilla.com', 'email': ['telemetry-alerts@mozilla.com', 'bewu@mozilla.com', 'wlachance@mozilla.com'], 'start_date': '2019-11-01', 'retries': 1}, schedule_interval="hourly") as dag:
|
||||
with DAG('bqetl_error_aggregates', default_args=default_args, schedule_interval=datetime.timedelta(seconds=10800)) as dag:
|
||||
|
||||
telemetry_derived__error_aggregates__v1 = bigquery_etl_query(
|
||||
destination_table="error_aggregates",
|
||||
dataset_id="telemetry_derived",
|
||||
destination_table='error_aggregates',
|
||||
dataset_id='telemetry_derived',
|
||||
project_id='moz-fx-data-shared-prod',
|
||||
owner='bewu@mozilla.com',
|
||||
email=['bewu@mozilla.com'],
|
||||
depends_on_past=False,
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ owners:
|
|||
- bewu@mozilla.com
|
||||
labels:
|
||||
incremental: true
|
||||
schedule: hourly # todo: datetime.timedelta(hours=3)
|
||||
schedule: 3h
|
||||
scheduling:
|
||||
dag_name: bqetl_error_aggregates
|
||||
depends_on_past: false
|
||||
|
|
Загрузка…
Ссылка в новой задаче